http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/hbase-configuration-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/hbase-configuration-test.cc b/hbase-native-client/src/hbase/client/hbase-configuration-test.cc new file mode 100644 index 0000000..173c4df --- /dev/null +++ b/hbase-native-client/src/hbase/client/hbase-configuration-test.cc @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <fstream> +#include <iostream> + +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <boost/filesystem.hpp> +#include "hbase/client/configuration.h" +#include "hbase/client/hbase-configuration-loader.h" + +using namespace hbase; +using std::experimental::nullopt; + +const std::string kDefHBaseConfPath("./build/test-data/hbase-configuration-test/conf/"); +const std::string kHBaseConfPath("./build/test-data/hbase-configuration-test/custom-conf/"); + +const std::string kHBaseDefaultXml("hbase-default.xml"); +const std::string kHBaseSiteXml("hbase-site.xml"); + +const std::string kHBaseDefaultXmlData( + "<?xml version=\"1.0\"?>\n<?xml-stylesheet type=\"text/xsl\" " + "href=\"configuration.xsl\"?>\n<!--\n/**\n *\n * Licensed to the Apache " + "Software Foundation (ASF) under one\n * or more contributor license " + "agreements. See the NOTICE file\n * distributed with this work for " + "additional information\n * regarding copyright ownership. The ASF " + "licenses this file\n * to you under the Apache License, Version 2.0 " + "(the\n * \"License\"); you may not use this file except in compliance\n * " + "with the License. You may obtain a copy of the License at\n *\n * " + "http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by " + "applicable law or agreed to in writing, software\n * distributed under " + "the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES " + "OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License " + "for the specific language governing permissions and\n * limitations under " + "the License.\n " + "*/\n-->\n<configuration>\n\n<property>\n<name>hbase.rootdir</" + "name>\n<value>/root/hbase-docker/apps/hbase/data</value>\n<final>true</" + "final>\n</" + "property>\n\n<property>\n<name>hbase.zookeeper.property.datadir</" + "name>\n<value>This value will be " + "overwritten</value>\n<final>false</final>\n</" + "property>\n\n<property>\n<name>default-prop</name>\n<value>default-value</" + "value>\n</property>\n\n</configuration>"); +const std::string kHBaseSiteXmlData( + "<?xml version=\"1.0\"?>\n<?xml-stylesheet type=\"text/xsl\" " + "href=\"configuration.xsl\"?>\n<!--\n/**\n *\n * Licensed to the Apache " + "Software Foundation (ASF) under one\n * or more contributor license " + "agreements. See the NOTICE file\n * distributed with this work for " + "additional information\n * regarding copyright ownership. The ASF " + "licenses this file\n * to you under the Apache License, Version 2.0 " + "(the\n * \"License\"); you may not use this file except in compliance\n * " + "with the License. You may obtain a copy of the License at\n *\n * " + "http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by " + "applicable law or agreed to in writing, software\n * distributed under " + "the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES " + "OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License " + "for the specific language governing permissions and\n * limitations under " + "the License.\n " + "*/\n-->\n<configuration>\n\n<property>\n<name>hbase.rootdir</" + "name>\n<value>This value will not be be " + "overwritten</value>\n</" + "property>\n\n<property>\n<name>hbase.zookeeper.property.datadir</" + "name>\n<value>/root/hbase-docker/zookeeper</value>\n</" + "property>\n\n<property>\n<name>hbase-client.user.name</" + "name>\n<value>${user.name}</value>\n</" + "property>\n\n<property>\n<name>hbase-client.user.dir</" + "name>\n<value>${user.dir}</value>\n</" + "property>\n\n<property>\n<name>hbase-client.user.home</" + "name>\n<value>${user.home}</value>\n</" + "property>\n\n<property>\n<name>selfRef</name>\n<value>${selfRef}</" + "value>\n</property>\n\n<property>\n<name>foo.substs</" + "name>\n<value>${bar},${bar},${bar},${bar},${bar},${bar},${bar},${bar},${" + "bar},${bar},</value>\n</" + "property>\n\n<property>\n<name>foo.substs.exception</" + "name>\n<value>${bar},${bar},${bar},${bar},${bar},${bar},${bar},${bar},${" + "bar},${bar},${bar},${bar},${bar},${bar},${bar},${bar},${bar},${bar},${bar}" + ",${bar},${bar}</value>\n</property>\n\n<property>\n<name>bar</" + "name>\n<value>bar-value</value>\n</" + "property>\n\n<property>\n<name>custom-prop</name>\n<value>custom-value</" + "value>\n</property>\n\n<property>\n<name>int</name>\n<value>16000</" + "value>\n</property>\n\n<property>\n<name>int.largevalue</" + "name>\n<value>2147483646</value>\n</" + "property>\n\n<property>\n<name>int.exception</name>\n<value>2147483648</" + "value>\n</property>\n\n<property>\n<name>long</name>\n<value>2147483850</" + "value>\n</property>\n\n<property>\n<name>long.largevalue</" + "name>\n<value>9223372036854775807</value>\n</" + "property>\n\n<property>\n<name>long.exception</" + "name>\n<value>9223372036854775810</value>\n</" + "property>\n\n<property>\n<name>double</name>\n<value>17.9769e+100</" + "value>\n</property>\n\n<property>\n<name>double.largevalue</" + "name>\n<value>170.769e+200</value>\n</" + "property>\n\n<property>\n<name>double.exception</" + "name>\n<value>1.79769e+310</value>\n</" + "property>\n\n<property>\n<name>bool.true</name>\n<value>true</value>\n</" + "property>\n\n<property>\n<name>bool.false</name>\n<value>false</value>\n</" + "property>\n\n<property>\n<name>bool.exception</name>\n<value>unknown " + "bool</value>\n</property>\n\n</configuration>"); + +void WriteDataToFile(const std::string &file, const std::string &xml_data) { + std::ofstream hbase_conf; + hbase_conf.open(file.c_str()); + hbase_conf << xml_data; + hbase_conf.close(); +} + +void CreateHBaseConf(const std::string &dir, const std::string &file, const std::string xml_data) { + // Directory will be created if not present + if (!boost::filesystem::exists(dir)) { + boost::filesystem::create_directories(dir); + } + // Remove temp file always + boost::filesystem::remove((dir + file).c_str()); + WriteDataToFile((dir + file), xml_data); +} + +void CreateHBaseConfWithEnv() { + CreateHBaseConf(kDefHBaseConfPath, kHBaseDefaultXml, kHBaseDefaultXmlData); + CreateHBaseConf(kDefHBaseConfPath, kHBaseSiteXml, kHBaseSiteXmlData); + setenv("HBASE_CONF", kDefHBaseConfPath.c_str(), 1); +} + +/* + * Config will be loaded from $HBASE_CONF. We set it @ kDefHBaseConfPath + * Config values will be loaded from hbase-default.xml and hbase-site.xml + * present in the above path. + */ +TEST(Configuration, LoadConfFromDefaultLocation) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConf(kDefHBaseConfPath, kHBaseDefaultXml, kHBaseDefaultXmlData); + CreateHBaseConf(kDefHBaseConfPath, kHBaseSiteXml, kHBaseSiteXmlData); + setenv("HBASE_CONF", kDefHBaseConfPath.c_str(), 0); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_STREQ((*conf).Get("custom-prop", "Set this value").c_str(), "custom-value"); + EXPECT_STREQ((*conf).Get("default-prop", "Set this value").c_str(), "default-value"); +} + +/* + * Config will be loaded from hbase-site.xml defined at + * kHBaseConfPath + */ +TEST(Configuration, LoadConfFromCustomLocation) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConf(kHBaseConfPath, kHBaseSiteXml, kHBaseSiteXmlData); + + HBaseConfigurationLoader loader; + std::vector<std::string> resources{kHBaseSiteXml}; + hbase::optional<Configuration> conf = loader.LoadResources(kHBaseConfPath, resources); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_STREQ((*conf).Get("custom-prop", "").c_str(), "custom-value"); + EXPECT_STRNE((*conf).Get("custom-prop", "").c_str(), "some-value"); +} + +/* + * Config will be loaded from hbase-defualt.xml and hbase-site.xml @ + * kDefHBaseConfPath and kHBaseConfPath respectively. + */ +TEST(Configuration, LoadConfFromMultipleLocatons) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConf(kDefHBaseConfPath, kHBaseDefaultXml, kHBaseDefaultXmlData); + CreateHBaseConf(kDefHBaseConfPath, kHBaseSiteXml, kHBaseSiteXmlData); + CreateHBaseConf(kHBaseConfPath, kHBaseDefaultXml, kHBaseDefaultXmlData); + CreateHBaseConf(kHBaseConfPath, kHBaseSiteXml, kHBaseSiteXmlData); + + HBaseConfigurationLoader loader; + std::string conf_paths = kDefHBaseConfPath + ":" + kHBaseConfPath; + std::vector<std::string> resources{kHBaseDefaultXml, kHBaseSiteXml}; + hbase::optional<Configuration> conf = loader.LoadResources(conf_paths, resources); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_STREQ((*conf).Get("default-prop", "From hbase-default.xml").c_str(), "default-value"); + EXPECT_STREQ((*conf).Get("custom-prop", "").c_str(), "custom-value"); + EXPECT_STRNE((*conf).Get("custom-prop", "").c_str(), "some-value"); +} + +/* + * Config will be loaded from hbase-defualt.xml and hbase-site.xml @ + * $HBASE_CONF. + * We set HBASE_CONF to kDefHBaseConfPath + * Below tests load the conf files in the same way unless specified. + */ +TEST(Configuration, DefaultValues) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_STREQ((*conf).Get("default-prop", "Set this value.").c_str(), "default-value"); + EXPECT_STREQ((*conf).Get("custom-prop", "Set this value.").c_str(), "custom-value"); +} + +TEST(Configuration, FinalValues) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_STREQ((*conf).Get("hbase.rootdir", "").c_str(), "/root/hbase-docker/apps/hbase/data"); + EXPECT_STREQ((*conf).Get("hbase.zookeeper.property.datadir", "").c_str(), + "/root/hbase-docker/zookeeper"); + EXPECT_STRNE((*conf).Get("hbase.rootdir", "").c_str(), "This value will not be be overwritten"); + EXPECT_STRNE((*conf).Get("hbase.zookeeper.property.datadir", "").c_str(), + "This value will be overwritten"); +} + +/* + * Config will be loaded from HBASE_CONF which we set in + * CreateHBaseConfWithEnv(). + * Config values will be loaded from hbase-default.xml and hbase-site.xml in the + * above path. + */ +TEST(Configuration, EnvVars) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_STREQ((*conf).Get("hbase-client.user.name", "").c_str(), "${user.name}"); + EXPECT_STRNE((*conf).Get("hbase-client.user.name", "root").c_str(), "test-user"); +} + +TEST(Configuration, SelfRef) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_STREQ((*conf).Get("selfRef", "${selfRef}").c_str(), "${selfRef}"); +} + +TEST(Configuration, VarExpansion) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_STREQ((*conf).Get("foo.substs", "foo-value").c_str(), + "bar-value,bar-value,bar-value,bar-value,bar-value,bar-value," + "bar-value,bar-value,bar-value,bar-value,"); + EXPECT_STRNE((*conf).Get("foo.substs", "foo-value").c_str(), "bar-value"); +} + +TEST(Configuration, VarExpansionException) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + ASSERT_THROW((*conf).Get("foo.substs.exception", "foo-value").c_str(), std::runtime_error); +} + +TEST(Configuration, GetInt) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_EQ(16000, (*conf).GetInt("int", 0)); + EXPECT_EQ(2147483646, (*conf).GetInt("int.largevalue", 0)); +} + +TEST(Configuration, GetLong) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_EQ(2147483850, (*conf).GetLong("long", 0)); + EXPECT_EQ(9223372036854775807, (*conf).GetLong("long.largevalue", 0)); +} + +TEST(Configuration, GetDouble) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_DOUBLE_EQ(17.9769e+100, (*conf).GetDouble("double", 0.0)); + EXPECT_DOUBLE_EQ(170.769e+200, (*conf).GetDouble("double.largevalue", 0.0)); +} + +TEST(Configuration, GetBool) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + EXPECT_EQ(true, (*conf).GetBool("bool.true", true)); + EXPECT_EQ(false, (*conf).GetBool("bool.false", false)); +} + +TEST(Configuration, GetIntException) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + ASSERT_THROW((*conf).GetInt("int.exception", 0), std::runtime_error); +} + +TEST(Configuration, GetLongException) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + ASSERT_THROW((*conf).GetLong("long.exception", 0), std::runtime_error); +} + +TEST(Configuration, GetDoubleException) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + ASSERT_THROW((*conf).GetDouble("double.exception", 0), std::runtime_error); +} + +TEST(Configuration, GetBoolException) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + CreateHBaseConfWithEnv(); + + HBaseConfigurationLoader loader; + hbase::optional<Configuration> conf = loader.LoadDefaultResources(); + ASSERT_TRUE(conf != nullopt) << "No configuration object present."; + ASSERT_THROW((*conf).GetBool("bool.exception", false), std::runtime_error); +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/hbase-rpc-controller.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/hbase-rpc-controller.cc b/hbase-native-client/src/hbase/client/hbase-rpc-controller.cc new file mode 100644 index 0000000..c2ae6cc --- /dev/null +++ b/hbase-native-client/src/hbase/client/hbase-rpc-controller.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/hbase-rpc-controller.h" + +namespace hbase {} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/increment-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/increment-test.cc b/hbase-native-client/src/hbase/client/increment-test.cc new file mode 100644 index 0000000..3c11fbe --- /dev/null +++ b/hbase-native-client/src/hbase/client/increment-test.cc @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "hbase/client/increment.h" +#include "hbase/client/mutation.h" +#include "hbase/client/put.h" +#include "hbase/utils/time-util.h" + +using hbase::Increment; +using hbase::Increment; +using hbase::Cell; +using hbase::CellType; +using hbase::Mutation; +using hbase::TimeUtil; + +const constexpr int64_t Mutation::kLatestTimestamp; + +TEST(Increment, Row) { + Increment incr{"foo"}; + EXPECT_EQ("foo", incr.row()); +} + +TEST(Increment, Durability) { + Increment incr{"row"}; + EXPECT_EQ(hbase::pb::MutationProto_Durability_USE_DEFAULT, incr.Durability()); + + auto skipWal = hbase::pb::MutationProto_Durability_SKIP_WAL; + incr.SetDurability(skipWal); + EXPECT_EQ(skipWal, incr.Durability()); +} + +TEST(Increment, Timestamp) { + Increment incr{"row"}; + + // test default timestamp + EXPECT_EQ(Mutation::kLatestTimestamp, incr.TimeStamp()); + + // set custom timestamp + auto ts = TimeUtil::ToMillis(TimeUtil::GetNowNanos()); + incr.SetTimeStamp(ts); + EXPECT_EQ(ts, incr.TimeStamp()); + + // Add a column with custom timestamp + incr.AddColumn("f", "q", 5l); + auto &cell = incr.FamilyMap().at("f")[0]; + EXPECT_EQ(ts, cell->Timestamp()); +} + +TEST(Increment, HasFamilies) { + Increment incr{"row"}; + + EXPECT_EQ(false, incr.HasFamilies()); + + incr.AddColumn("f", "q", 5l); + EXPECT_EQ(true, incr.HasFamilies()); +} + +TEST(Increment, Add) { + CellType cell_type = CellType::PUT; + std::string row = "row"; + std::string family = "family"; + std::string column = "column"; + std::string value = "value"; + int64_t timestamp = std::numeric_limits<int64_t>::max(); + auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); + + // add first cell + Increment incr{"row"}; + incr.Add(std::move(cell)); + EXPECT_EQ(1, incr.FamilyMap().size()); + EXPECT_EQ(1, incr.FamilyMap().at(family).size()); + + // add a non-matching row + auto cell2 = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); + Increment incr2{"foo"}; + ASSERT_THROW(incr2.Add(std::move(cell2)), std::runtime_error); // rows don't match + + // add a second cell with same family + auto cell3 = std::make_unique<Cell>(row, family, "column-2", timestamp, value, cell_type); + incr.Add(std::move(cell3)); + EXPECT_EQ(1, incr.FamilyMap().size()); + EXPECT_EQ(2, incr.FamilyMap().at(family).size()); + + // add a cell to a different family + auto cell4 = std::make_unique<Cell>(row, "family-2", "column-2", timestamp, value, cell_type); + incr.Add(std::move(cell4)); + EXPECT_EQ(2, incr.FamilyMap().size()); + EXPECT_EQ(1, incr.FamilyMap().at("family-2").size()); +} + +TEST(Increment, AddColumn) { + std::string row = "row"; + std::string family = "family"; + std::string column = "column"; + std::string value = "value"; + + Increment incr{"row"}; + incr.AddColumn(family, column, 5l); + EXPECT_EQ(1, incr.FamilyMap().size()); + EXPECT_EQ(1, incr.FamilyMap().at(family).size()); + + // add a second cell with same family + incr.AddColumn(family, "column-2", 6l); + EXPECT_EQ(1, incr.FamilyMap().size()); + EXPECT_EQ(2, incr.FamilyMap().at(family).size()); + + // add a cell to a different family + incr.AddColumn("family-2", column, 7l); + EXPECT_EQ(2, incr.FamilyMap().size()); + EXPECT_EQ(1, incr.FamilyMap().at("family-2").size()); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/increment.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/increment.cc b/hbase-native-client/src/hbase/client/increment.cc new file mode 100644 index 0000000..a8ac49a --- /dev/null +++ b/hbase-native-client/src/hbase/client/increment.cc @@ -0,0 +1,56 @@ + + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/increment.h" +#include <folly/Conv.h> +#include <algorithm> +#include <limits> +#include <stdexcept> +#include <utility> + +#include "hbase/utils/bytes-util.h" + +namespace hbase { + +/** + * @brief Increment the column from the specific family with the specified qualifier + * by the specified amount. + * @param family family name + * @param qualifier column qualifier + * @param amount amount to increment by + */ +Increment& Increment::AddColumn(const std::string& family, const std::string& qualifier, + int64_t amount) { + family_map_[family].push_back(std::move(std::make_unique<Cell>( + row_, family, qualifier, timestamp_, BytesUtil::ToString(amount), hbase::CellType::PUT))); + return *this; +} +Increment& Increment::Add(std::unique_ptr<Cell> cell) { + if (cell->Row() != row_) { + throw std::runtime_error("The row in " + cell->DebugString() + + " doesn't match the original one " + row_); + } + + family_map_[cell->Family()].push_back(std::move(cell)); + return *this; +} + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/keyvalue-codec.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/keyvalue-codec.cc b/hbase-native-client/src/hbase/client/keyvalue-codec.cc new file mode 100644 index 0000000..9b6e689 --- /dev/null +++ b/hbase-native-client/src/hbase/client/keyvalue-codec.cc @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/keyvalue-codec.h" + +#include <string> + +namespace hbase { + +KeyValueCodec::KVDecoder::KVDecoder(std::unique_ptr<folly::IOBuf> cell_block, uint32_t offset, + uint32_t length) + : cell_block_(std::move(cell_block)), offset_(offset), length_(length) {} + +KeyValueCodec::KVDecoder::~KVDecoder() {} + +std::shared_ptr<Cell> KeyValueCodec::KVDecoder::Decode(folly::io::Cursor &cursor) { + uint32_t key_length = cursor.readBE<uint32_t>(); + uint32_t value_length = cursor.readBE<uint32_t>(); + uint16_t row_length = cursor.readBE<uint16_t>(); + std::string row = cursor.readFixedString(row_length); + uint8_t column_family_length = cursor.readBE<uint8_t>(); + std::string column_family = cursor.readFixedString(column_family_length); + int qualifier_length = + key_length - (row_length + column_family_length + kHBaseSizeOfKeyInfrastructure_); + std::string column_qualifier = cursor.readFixedString(qualifier_length); + uint64_t timestamp = cursor.readBE<uint64_t>(); + uint8_t key_type = cursor.readBE<uint8_t>(); + std::string value = cursor.readFixedString(value_length); + + return std::make_shared<Cell>(row, column_family, column_qualifier, timestamp, value, + static_cast<hbase::CellType>(key_type)); +} + +bool KeyValueCodec::KVDecoder::Advance() { + if (end_of_cell_block_) { + return false; + } + + if (cur_pos_ == length_) { + end_of_cell_block_ = true; + return false; + } + + folly::io::Cursor cursor(cell_block_.get()); + cursor.skip(offset_ + cur_pos_); + uint32_t current_cell_size = cursor.readBE<uint32_t>(); + current_cell_ = Decode(cursor); + cur_pos_ += kHBaseSizeOfInt_ + current_cell_size; + return true; +} + +uint32_t KeyValueCodec::KVDecoder::CellBlockLength() const { return length_; } +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/load-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/load-client.cc b/hbase-native-client/src/hbase/client/load-client.cc new file mode 100644 index 0000000..a321845 --- /dev/null +++ b/hbase-native-client/src/hbase/client/load-client.cc @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <folly/Logging.h> +#include <folly/Random.h> +#include <gflags/gflags.h> + +#include <atomic> +#include <chrono> +#include <iostream> +#include <thread> + +#include "hbase/client/client.h" +#include "hbase/client/get.h" +#include "hbase/client/put.h" +#include "hbase/client/table.h" +#include "hbase/serde/table-name.h" +#include "hbase/utils/time-util.h" + +using hbase::Client; +using hbase::Configuration; +using hbase::Get; +using hbase::Put; +using hbase::Table; +using hbase::pb::TableName; +using hbase::TimeUtil; +using folly::Random; + +DEFINE_string(table, "load_test_table", "What table to do the reads and writes with"); +DEFINE_string(families, "f", "comma separated list of column family names"); +DEFINE_string(conf, "", "Conf directory to read the config from (optional)"); +DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); +DEFINE_string(znode, "/hbase", "parent znode"); +DEFINE_uint64(num_rows, 1'000'000, "How many rows to write and read"); +DEFINE_uint64(num_cols, 1000, "How many columns there are in a row"); +DEFINE_int32(threads, 10, "How many client threads"); +DEFINE_int32(batch_num_rows, 100, "number of rows in one multi-get / multi-put"); +DEFINE_uint64(report_num_rows, 5000, "How frequent we should report the progress"); +DEFINE_bool(gets, true, "perform gets"); +DEFINE_bool(scans, true, "perform scans"); +DEFINE_bool(puts, true, "perform put's"); +DEFINE_bool(appends, true, "perform append's"); + +static constexpr const char *kNumColumn = "num"; +static constexpr const char *incrPrefix = "i"; +static constexpr const char *appendPrefix = "a"; + +std::string PrefixZero(int total_width, int num) { + std::string str = std::to_string(num); + int prefix_len = total_width - str.length(); + if (prefix_len > 0) { + return std::string(prefix_len, '0') + str; + } + return str; +} + +bool Verify(std::shared_ptr<hbase::Result> result, std::string family, int m) { + auto col = std::to_string(m); + if (!result->Value(family, col)) { + LOG(ERROR) << "Column:" << col << " is not found for " << result->Row(); + return false; + } + auto l = *(result->Value(family, col)); + if (l != col) { + LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col; + return false; + } + if (FLAGS_appends) { + if (!result->Value(family, incrPrefix + col)) { + LOG(ERROR) << "Column:" << (incrPrefix + col) << " is not found for " << result->Row(); + return false; + } + auto int_val = hbase::BytesUtil::ToInt64(*(result->Value(family, incrPrefix + col))); + if (int_val != m) { + LOG(ERROR) << "value is not " << col << " for " << result->Row(); + return false; + } + if (!result->Value(family, appendPrefix + col)) { + LOG(ERROR) << "Column:" << (appendPrefix + col) << " is not found for " << result->Row(); + return false; + } + l = *(result->Value(family, appendPrefix + col)); + if (l != col) { + LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col; + return false; + } + } + + return true; +} + +bool Verify(std::shared_ptr<hbase::Result> result, const std::string &row, + const std::vector<std::string> &families) { + if (result == nullptr || result->IsEmpty()) { + LOG(ERROR) << "didn't get result"; + return false; + } + if (result->Row().compare(row) != 0) { + LOG(ERROR) << "row " << result->Row() << " is not the expected: " << row; + return false; + } + // Test the values + for (auto family : families) { + if (!result->Value(family, kNumColumn)) { + LOG(ERROR) << "Column:" << kNumColumn << " is not found for " << result->Row(); + return false; + } + auto cols = std::stoi(*(result->Value(family, kNumColumn))); + VLOG(3) << "Result for row:" << row << " contains " << std::to_string(cols) << " columns"; + for (int m = 1; m <= cols; m++) { + if (!Verify(result, family, m)) return false; + } + } + return true; +} + +bool DoScan(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Table> table, + const std::vector<std::string> &families) { + hbase::Scan scan{}; + auto start = iteration * rows; + auto end = start + rows; + auto width = std::to_string(max_row).length(); + scan.SetStartRow(PrefixZero(width, start)); + if (end != max_row && end != max_row + 1) { + scan.SetStopRow(PrefixZero(width, end)); + } + + auto start_ns = TimeUtil::GetNowNanos(); + auto scanner = table->Scan(scan); + + auto cnt = 0; + auto r = scanner->Next(); + while (r != nullptr) { + auto row = PrefixZero(width, start + cnt); + if (!Verify(r, row, families)) { + return false; + } + cnt++; + r = scanner->Next(); + if (cnt != 0 && cnt % FLAGS_report_num_rows == 0) { + LOG(INFO) << "(Thread " << iteration << ") " + << "Scan iterated over " << cnt << " results in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } + } + if (cnt != rows) { + LOG(ERROR) << "(Thread " << iteration << ") " + << "Expected number of results does not match. expected:" << rows + << ", actual:" << cnt; + return false; + } + LOG(INFO) << "(Thread " << iteration << ") " + << "scanned " << std::to_string(cnt) << " rows in " << TimeUtil::ElapsedMillis(start_ns) + << " ms."; + return true; +} + +bool DoGet(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Table> table, + const std::vector<std::string> &families, uint64_t batch_num_rows) { + auto width = std::to_string(max_row).length(); + auto start_ns = TimeUtil::GetNowNanos(); + for (uint64_t k = iteration; k <= max_row;) { + uint64_t total_read = 0; + std::vector<hbase::Get> gets; + for (uint64_t i = 0; i < batch_num_rows && k <= max_row; ++i, k += FLAGS_threads) { + std::string row = PrefixZero(width, k); + hbase::Get get(row); + gets.push_back(get); + } + VLOG(3) << "getting for " << batch_num_rows << " rows"; + auto results = table->Get(gets); + if (results.size() != gets.size()) { + LOG(ERROR) << "(Thread " << iteration << ") " + << "Expected number of results does not match. expected:" << gets.size() + << ", actual:" << results.size(); + return false; + } + for (uint64_t i = 0; i < batch_num_rows && i < results.size(); ++i) { + if (!Verify(results[i], gets[i].row(), families)) { + return false; + } + } + total_read += gets.size(); + if (total_read != 0 && total_read % FLAGS_report_num_rows == 0) { + LOG(INFO) << "(Thread " << iteration << ") " + << "Sent " << total_read << " Multi-Get requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } + k += batch_num_rows; + } + LOG(INFO) << "(Thread " << iteration << ") " + << "Sent " << rows << " gets" + << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; + return true; +} + +void DoPut(int iteration, uint64_t max_row, uint64_t rows, int cols, std::unique_ptr<Table> table, + const std::vector<std::string> &families) { + auto start_ns = TimeUtil::GetNowNanos(); + auto width = std::to_string(max_row).length(); + for (uint64_t j = 0; j < rows; j++) { + std::string row = PrefixZero(width, iteration * rows + j); + auto put = Put{row}; + for (auto family : families) { + auto n_cols = Random::rand32(1, cols); + put.AddColumn(family, kNumColumn, std::to_string(n_cols)); + for (unsigned int k = 1; k <= n_cols; k++) { + put.AddColumn(family, std::to_string(k), std::to_string(k)); + } + } + table->Put(put); + if ((j + 1) % FLAGS_report_num_rows == 0) { + LOG(INFO) << "(Thread " << iteration << ") " + << "Written " << std::to_string(j + 1) << " rows in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } + } + LOG(INFO) << "(Thread " << iteration << ") " + << "written " << std::to_string(rows) << " rows" + << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; +} + +bool DoAppendIncrement(int iteration, uint64_t max_row, uint64_t rows, int cols, + std::unique_ptr<Table> table, const std::vector<std::string> &families) { + auto start_ns = TimeUtil::GetNowNanos(); + auto width = std::to_string(max_row).length(); + for (uint64_t j = 0; j < rows; j++) { + std::string row = PrefixZero(width, iteration * rows + j); + hbase::Get get(row); + auto result = table->Get(get); + for (auto family : families) { + auto n_cols = std::stoi(*(result->Value(family, kNumColumn))); + for (unsigned int k = 1; k <= n_cols; k++) { + table->Increment( + hbase::Increment{row}.AddColumn(family, incrPrefix + std::to_string(k), k)); + if (!table->Append(hbase::Append{row}.Add(family, appendPrefix + std::to_string(k), + std::to_string(k)))) { + LOG(ERROR) << "(Thread " << iteration << ") " + << "append for " << row << " family: " << family << " failed"; + return false; + } + } + } + if ((j + 1) % FLAGS_report_num_rows == 0) + LOG(INFO) << "(Thread " << iteration << ") " + << "Written " << std::to_string(j + 1) << " increments" + << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } + LOG(INFO) << "(Thread " << iteration << ") " + << "written " << std::to_string(rows) << " increments" + << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; + return true; +} + +int main(int argc, char *argv[]) { + gflags::SetUsageMessage("Load client to manipulate multiple rows from HBase on the comamnd line"); + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + google::InstallFailureSignalHandler(); + FLAGS_logtostderr = 1; + FLAGS_stderrthreshold = 1; + + if (FLAGS_batch_num_rows < 1) { + LOG(ERROR) << "size of multi get should be positive"; + return -1; + } + if (!FLAGS_gets && !FLAGS_scans && !FLAGS_puts) { + LOG(ERROR) << "Must perform at least Get or Put operations"; + return -1; + } + std::shared_ptr<Configuration> conf = nullptr; + if (FLAGS_conf == "") { + // Configuration + conf = std::make_shared<Configuration>(); + conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper); + conf->Set("zookeeper.znode.parent", FLAGS_znode); + } else { + setenv("HBASE_CONF", FLAGS_conf.c_str(), 1); + hbase::HBaseConfigurationLoader loader; + conf = std::make_shared<Configuration>(loader.LoadDefaultResources().value()); + } + auto tn = std::make_shared<TableName>(folly::to<TableName>(FLAGS_table)); + auto num_puts = FLAGS_num_rows; + + auto client = std::make_unique<Client>(*conf); + + // Do the Put requests + + std::vector<std::string> families; + std::size_t pos = 0, found; + while ((found = FLAGS_families.find_first_of(',', pos)) != std::string::npos) { + families.push_back(FLAGS_families.substr(pos, found - pos)); + pos = found + 1; + } + families.push_back(FLAGS_families.substr(pos)); + + int rows = FLAGS_num_rows / FLAGS_threads; + if (FLAGS_num_rows % FLAGS_threads != 0) rows++; + int cols = FLAGS_num_cols; + std::atomic<int8_t> succeeded{1}; // not using bool since we want atomic &= + if (FLAGS_puts) { + LOG(INFO) << "Sending put requests"; + auto start_ns = TimeUtil::GetNowNanos(); + std::vector<std::thread> writer_threads; + for (int i = 0; i < FLAGS_threads; i++) { + writer_threads.push_back(std::thread([&, i] { + auto table = client->Table(*tn); + DoPut(i, FLAGS_num_rows - 1, rows, cols, std::move(table), families); + })); + } + for (auto &t : writer_threads) { + t.join(); + } + LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } + if (FLAGS_appends) { + LOG(INFO) << "Sending append/increment requests"; + auto start_ns = TimeUtil::GetNowNanos(); + std::vector<std::thread> writer_threads; + for (int i = 0; i < FLAGS_threads; i++) { + writer_threads.push_back(std::thread([&, i] { + auto table = client->Table(*tn); + succeeded &= + DoAppendIncrement(i, FLAGS_num_rows - 1, rows, cols, std::move(table), families); + })); + } + for (auto &t : writer_threads) { + t.join(); + } + LOG(INFO) << "Successfully sent " << num_puts << " append requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } + + if (FLAGS_scans) { + LOG(INFO) << "Sending scan requests"; + auto start_ns = TimeUtil::GetNowNanos(); + std::vector<std::thread> reader_threads; + for (int i = 0; i < FLAGS_threads; i++) { + reader_threads.push_back(std::thread([&, i] { + auto table1 = client->Table(*tn); + succeeded &= DoScan(i, FLAGS_num_rows - 1, rows, std::move(table1), families); + })); + } + for (auto &t : reader_threads) { + t.join(); + } + + LOG(INFO) << (succeeded.load() ? "Successfully " : "Failed. ") << " scannned " << num_puts + << " rows in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } + + if (FLAGS_gets) { + LOG(INFO) << "Sending get requests"; + auto start_ns = TimeUtil::GetNowNanos(); + std::vector<std::thread> reader_threads; + for (int i = 0; i < FLAGS_threads; i++) { + reader_threads.push_back(std::thread([&, i] { + auto table1 = client->Table(*tn); + succeeded &= + DoGet(i, FLAGS_num_rows - 1, rows, std::move(table1), families, FLAGS_batch_num_rows); + })); + } + for (auto &t : reader_threads) { + t.join(); + } + + LOG(INFO) << (succeeded.load() ? "Successful. " : "Failed. ") << " sent multi-get requests for " + << num_puts << " rows in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } + client->Close(); + + return succeeded.load() ? 0 : -1; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/location-cache-retry-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/location-cache-retry-test.cc b/hbase-native-client/src/hbase/client/location-cache-retry-test.cc new file mode 100644 index 0000000..283cf85 --- /dev/null +++ b/hbase-native-client/src/hbase/client/location-cache-retry-test.cc @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <gtest/gtest.h> + +#include "hbase/client/append.h" +#include "hbase/client/cell.h" +#include "hbase/client/client.h" +#include "hbase/client/configuration.h" +#include "hbase/client/delete.h" +#include "hbase/client/get.h" +#include "hbase/client/hbase-configuration-loader.h" +#include "hbase/client/increment.h" +#include "hbase/client/meta-utils.h" +#include "hbase/client/put.h" +#include "hbase/client/result.h" +#include "hbase/client/table.h" +#include "hbase/exceptions/exception.h" +#include "hbase/serde/table-name.h" +#include "hbase/test-util/test-util.h" +#include "hbase/utils/bytes-util.h" + +using hbase::Cell; +using hbase::Configuration; +using hbase::Get; +using hbase::MetaUtil; +using hbase::RetriesExhaustedException; +using hbase::Put; +using hbase::Table; +using hbase::TestUtil; + +using std::chrono_literals::operator"" s; + +class LocationCacheRetryTest : public ::testing::Test { + public: + static std::unique_ptr<hbase::TestUtil> test_util; + static void SetUpTestCase() { + google::InstallFailureSignalHandler(); + test_util = std::make_unique<hbase::TestUtil>(); + test_util->StartMiniCluster(2); + test_util->conf()->SetInt("hbase.client.retries.number", 5); + } +}; + +std::unique_ptr<hbase::TestUtil> LocationCacheRetryTest::test_util = nullptr; + +TEST_F(LocationCacheRetryTest, GetFromMetaTable) { + auto tn = folly::to<hbase::pb::TableName>("hbase:meta"); + auto row = "test1"; + + hbase::Client client(*LocationCacheRetryTest::test_util->conf()); + + // do a get against the other table, but not the actual table "t". + auto table = client.Table(tn); + hbase::Get get(row); + auto result = table->Get(get); + + LocationCacheRetryTest::test_util->MoveRegion(MetaUtil::kMetaRegion, ""); + + std::this_thread::sleep_for(3s); // sleep 3 sec + + result = table->Get(get); +} + +TEST_F(LocationCacheRetryTest, PutGet) { + LocationCacheRetryTest::test_util->CreateTable("t", "d"); + LocationCacheRetryTest::test_util->CreateTable("t2", "d"); + + auto tn = folly::to<hbase::pb::TableName>("t"); + auto tn2 = folly::to<hbase::pb::TableName>("t2"); + auto row = "test1"; + + hbase::Client client(*LocationCacheRetryTest::test_util->conf()); + + // do a get against the other table, but not the actual table "t". + auto table = client.Table(tn); + auto table2 = client.Table(tn2); + hbase::Get get(row); + auto result = table2->Get(get); + + // we should have already cached the location of meta right now. Now + // move the meta region to the other server so that we will get a NotServingRegionException + // when we do the actual location lookup request. If there is no invalidation + // of the meta's own location, then following put/get will result in retries exhausted. + LocationCacheRetryTest::test_util->MoveRegion(MetaUtil::kMetaRegion, ""); + + std::this_thread::sleep_for(3s); // sleep 3 sec + + table->Put(Put{row}.AddColumn("d", "1", "value1")); + + result = table->Get(get); + + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test1", result->Row()); + EXPECT_EQ("value1", *(result->Value("d", "1"))); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/location-cache-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/location-cache-test.cc b/hbase-native-client/src/hbase/client/location-cache-test.cc new file mode 100644 index 0000000..af25902 --- /dev/null +++ b/hbase-native-client/src/hbase/client/location-cache-test.cc @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "hbase/client/location-cache.h" + +#include <folly/Memory.h> +#include <gtest/gtest.h> + +#include <chrono> + +#include "hbase/client/keyvalue-codec.h" +#include "hbase/if/HBase.pb.h" +#include "hbase/serde/table-name.h" +#include "hbase/test-util/test-util.h" + +using hbase::Cell; +using hbase::Configuration; +using hbase::ConnectionPool; +using hbase::MetaUtil; +using hbase::LocationCache; +using hbase::TestUtil; +using hbase::KeyValueCodec; +using std::chrono::milliseconds; + +class LocationCacheTest : public ::testing::Test { + protected: + static void SetUpTestCase() { + google::InstallFailureSignalHandler(); + test_util_ = std::make_unique<TestUtil>(); + test_util_->StartMiniCluster(2); + } + static void TearDownTestCase() { test_util_.release(); } + + virtual void SetUp() {} + virtual void TearDown() {} + + public: + static std::unique_ptr<TestUtil> test_util_; +}; + +std::unique_ptr<TestUtil> LocationCacheTest::test_util_ = nullptr; + +TEST_F(LocationCacheTest, TestGetMetaNodeContents) { + auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4); + auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4); + auto codec = std::make_shared<KeyValueCodec>(); + auto cp = std::make_shared<ConnectionPool>(io, cpu, codec, LocationCacheTest::test_util_->conf()); + LocationCache cache{LocationCacheTest::test_util_->conf(), io, cpu, cp}; + auto f = cache.LocateMeta(); + auto result = f.get(); + ASSERT_FALSE(f.hasException()); + ASSERT_TRUE(result.has_port()); + ASSERT_TRUE(result.has_host_name()); + cpu->stop(); + io->stop(); +} + +TEST_F(LocationCacheTest, TestGetRegionLocation) { + auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4); + auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4); + auto codec = std::make_shared<KeyValueCodec>(); + auto cp = std::make_shared<ConnectionPool>(io, cpu, codec, LocationCacheTest::test_util_->conf()); + LocationCache cache{LocationCacheTest::test_util_->conf(), io, cpu, cp}; + + // If there is no table this should throw an exception + auto tn = folly::to<hbase::pb::TableName>("t"); + auto row = "test"; + ASSERT_ANY_THROW(cache.LocateFromMeta(tn, row).get(milliseconds(1000))); + LocationCacheTest::test_util_->CreateTable("t", "d"); + auto loc = cache.LocateFromMeta(tn, row).get(milliseconds(1000)); + ASSERT_TRUE(loc != nullptr); + cpu->stop(); + io->stop(); +} + +TEST_F(LocationCacheTest, TestCaching) { + auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4); + auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4); + auto codec = std::make_shared<KeyValueCodec>(); + auto cp = std::make_shared<ConnectionPool>(io, cpu, codec, LocationCacheTest::test_util_->conf()); + LocationCache cache{LocationCacheTest::test_util_->conf(), io, cpu, cp}; + + auto tn_1 = folly::to<hbase::pb::TableName>("t1"); + auto tn_2 = folly::to<hbase::pb::TableName>("t2"); + auto tn_3 = folly::to<hbase::pb::TableName>("t3"); + auto row_a = "a"; + + // test location pulled from meta gets cached + ASSERT_ANY_THROW(cache.LocateRegion(tn_1, row_a).get(milliseconds(1000))); + ASSERT_ANY_THROW(cache.LocateFromMeta(tn_1, row_a).get(milliseconds(1000))); + LocationCacheTest::test_util_->CreateTable("t1", "d"); + + ASSERT_FALSE(cache.IsLocationCached(tn_1, row_a)); + auto loc = cache.LocateRegion(tn_1, row_a).get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_1, row_a)); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_1, row_a)); + + // test with two regions + std::vector<std::string> keys; + keys.push_back("b"); + LocationCacheTest::test_util_->CreateTable("t2", "d", keys); + + ASSERT_FALSE(cache.IsLocationCached(tn_2, "a")); + loc = cache.LocateRegion(tn_2, "a").get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_2, "a")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "a")); + + ASSERT_FALSE(cache.IsLocationCached(tn_2, "b")); + loc = cache.LocateRegion(tn_2, "b").get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_2, "b")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "b")); + ASSERT_TRUE(cache.IsLocationCached(tn_2, "ba")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "ba")); + + // test with three regions + keys.clear(); + keys.push_back("b"); + keys.push_back("c"); + LocationCacheTest::test_util_->CreateTable("t3", "d", keys); + + ASSERT_FALSE(cache.IsLocationCached(tn_3, "c")); + ASSERT_FALSE(cache.IsLocationCached(tn_3, "ca")); + loc = cache.LocateRegion(tn_3, "ca").get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "c")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "c")); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "ca")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "ca")); + + ASSERT_FALSE(cache.IsLocationCached(tn_3, "b")); + loc = cache.LocateRegion(tn_3, "b").get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "b")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "b")); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "ba")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "ba")); + + // clear second region + cache.ClearCachedLocation(tn_3, "b"); + ASSERT_FALSE(cache.IsLocationCached(tn_3, "b")); + + ASSERT_FALSE(cache.IsLocationCached(tn_3, "a")); + loc = cache.LocateRegion(tn_3, "a").get(milliseconds(1000)); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "a")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "a")); + ASSERT_TRUE(cache.IsLocationCached(tn_3, "abc")); + ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "abc")); + + cpu->stop(); + io->stop(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/location-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/location-cache.cc b/hbase-native-client/src/hbase/client/location-cache.cc new file mode 100644 index 0000000..2be2b94 --- /dev/null +++ b/hbase-native-client/src/hbase/client/location-cache.cc @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "hbase/client/location-cache.h" + +#include <folly/Conv.h> +#include <folly/Logging.h> +#include <folly/io/IOBuf.h> +#include <wangle/concurrent/CPUThreadPoolExecutor.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> + +#include <map> +#include <shared_mutex> +#include <utility> + +#include "hbase/connection/response.h" +#include "hbase/connection/rpc-connection.h" +#include "hbase/client/meta-utils.h" +#include "hbase/exceptions/exception.h" +#include "hbase/if/Client.pb.h" +#include "hbase/if/ZooKeeper.pb.h" +#include "hbase/serde/region-info.h" +#include "hbase/serde/server-name.h" +#include "hbase/serde/zk.h" + +using hbase::pb::MetaRegionServer; +using hbase::pb::ServerName; +using hbase::pb::TableName; + +namespace hbase { + +LocationCache::LocationCache(std::shared_ptr<hbase::Configuration> conf, + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor, + std::shared_ptr<ConnectionPool> cp) + : conf_(conf), + io_executor_(io_executor), + cpu_executor_(cpu_executor), + cp_(cp), + meta_promise_(nullptr), + meta_lock_(), + meta_util_(), + zk_(nullptr), + cached_locations_(), + locations_lock_() { + zk_quorum_ = ZKUtil::ParseZooKeeperQuorum(*conf_); + EnsureZooKeeperConnection(); +} + +LocationCache::~LocationCache() { CloseZooKeeperConnection(); } + +void LocationCache::CloseZooKeeperConnection() { + if (zk_ != nullptr) { + zookeeper_close(zk_); + zk_ = nullptr; + LOG(INFO) << "Closed connection to ZooKeeper."; + } +} + +void LocationCache::EnsureZooKeeperConnection() { + if (zk_ == nullptr) { + LOG(INFO) << "Connecting to ZooKeeper. Quorum:" + zk_quorum_; + auto session_timeout = ZKUtil::SessionTimeout(*conf_); + zk_ = zookeeper_init(zk_quorum_.c_str(), nullptr, session_timeout, nullptr, nullptr, 0); + } +} + +folly::Future<ServerName> LocationCache::LocateMeta() { + std::lock_guard<std::recursive_mutex> g(meta_lock_); + if (meta_promise_ == nullptr) { + this->RefreshMetaLocation(); + } + return meta_promise_->getFuture().onError([&](const folly::exception_wrapper &ew) { + auto promise = InvalidateMeta(); + if (promise) { + promise->setException(ew); + } + throw ew; + return ServerName{}; + }); +} + +std::shared_ptr<folly::SharedPromise<hbase::pb::ServerName>> LocationCache::InvalidateMeta() { + VLOG(2) << "Invalidating meta location"; + std::lock_guard<std::recursive_mutex> g(meta_lock_); + if (meta_promise_ != nullptr) { + // return the unique_ptr back to the caller. + std::shared_ptr<folly::SharedPromise<hbase::pb::ServerName>> ret = nullptr; + std::swap(ret, meta_promise_); + return ret; + } else { + return nullptr; + } +} + +void LocationCache::RefreshMetaLocation() { + meta_promise_ = std::make_shared<folly::SharedPromise<ServerName>>(); + auto p = meta_promise_; + cpu_executor_->add([this, p] { + std::lock_guard<std::recursive_mutex> g(meta_lock_); + p->setWith([&] { return this->ReadMetaLocation(); }); + }); +} + +// Note: this is a blocking call to zookeeper +ServerName LocationCache::ReadMetaLocation() { + auto buf = folly::IOBuf::create(4096); + ZkDeserializer derser; + EnsureZooKeeperConnection(); + + // This needs to be int rather than size_t as that's what ZK expects. + int len = buf->capacity(); + std::string zk_node = ZKUtil::MetaZNode(*conf_); + int zk_result = zoo_get(this->zk_, zk_node.c_str(), 0, + reinterpret_cast<char *>(buf->writableData()), &len, nullptr); + if (zk_result != ZOK || len < 9) { + LOG(ERROR) << "Error getting meta location."; + // We just close the zk connection, and let the upper levels retry. + CloseZooKeeperConnection(); + throw std::runtime_error("Error getting meta location. Quorum: " + zk_quorum_); + } + buf->append(len); + + MetaRegionServer mrs; + if (derser.Parse(buf.get(), &mrs) == false) { + LOG(ERROR) << "Unable to decode"; + throw std::runtime_error("Error getting meta location (Unable to decode). Quorum: " + + zk_quorum_); + } + return mrs.server(); +} + +folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta( + const TableName &tn, const std::string &row) { + return this->LocateMeta() + .via(cpu_executor_.get()) + .then([this](ServerName sn) { + // TODO: use RpcClient? + auto remote_id = std::make_shared<ConnectionId>(sn.host_name(), sn.port()); + return this->cp_->GetConnection(remote_id); + }) + .then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) { + return rpc_connection->SendRequest(std::move(meta_util_.MetaRequest(tn, row))); + }) + .onError([&](const folly::exception_wrapper &ew) { + auto promise = InvalidateMeta(); + throw ew; + return static_cast<std::unique_ptr<Response>>(nullptr); + }) + .then([tn, this](std::unique_ptr<Response> resp) { + // take the protobuf response and make it into + // a region location. + return meta_util_.CreateLocation(std::move(*resp), tn); + }) + .then([tn, this](std::shared_ptr<RegionLocation> rl) { + // Make sure that the correct location was found. + if (rl->region_info().table_name().namespace_() != tn.namespace_() || + rl->region_info().table_name().qualifier() != tn.qualifier()) { + throw TableNotFoundException(folly::to<std::string>(tn)); + } + return rl; + }) + .then([this](std::shared_ptr<RegionLocation> rl) { + auto remote_id = + std::make_shared<ConnectionId>(rl->server_name().host_name(), rl->server_name().port()); + return rl; + }) + .then([tn, this](std::shared_ptr<RegionLocation> rl) { + // now add fetched location to the cache. + this->CacheLocation(tn, rl); + return rl; + }); +} + +constexpr const char *MetaUtil::kMetaRegionName; + +folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateRegion( + const TableName &tn, const std::string &row, const RegionLocateType locate_type, + const int64_t locate_ns) { + // We maybe asked to locate meta itself + if (MetaUtil::IsMeta(tn)) { + return LocateMeta().then([this](const ServerName &server_name) { + auto rl = std::make_shared<RegionLocation>(MetaUtil::kMetaRegionName, + meta_util_.meta_region_info(), server_name); + return rl; + }); + } + + // TODO: implement region locate type and timeout + auto cached_loc = this->GetCachedLocation(tn, row); + if (cached_loc != nullptr) { + return cached_loc; + } else { + return this->LocateFromMeta(tn, row); + } +} + +// must hold shared lock on locations_lock_ +std::shared_ptr<RegionLocation> LocationCache::GetCachedLocation(const hbase::pb::TableName &tn, + const std::string &row) { + auto t_locs = this->GetTableLocations(tn); + std::shared_lock<folly::SharedMutexWritePriority> lock(locations_lock_); + + // looking for the "floor" key as a start key + auto possible_region = t_locs->upper_bound(row); + + if (t_locs->empty()) { + VLOG(5) << "Could not find region in cache, table map is empty"; + return nullptr; + } + + if (possible_region == t_locs->begin()) { + VLOG(5) << "Could not find region in cache, all keys are greater, row:" << row + << " ,possible_region:" << possible_region->second->DebugString(); + return nullptr; + } + --possible_region; + + VLOG(5) << "Found possible region in cache for row:" << row + << " ,possible_region:" << possible_region->second->DebugString(); + + // found possible start key, now need to check end key + if (possible_region->second->region_info().end_key() == "" || + possible_region->second->region_info().end_key() > row) { + VLOG(2) << "Found region in cache for row:" << row + << " ,region:" << possible_region->second->DebugString(); + return possible_region->second; + } else { + return nullptr; + } +} + +// must hold unique lock on locations_lock_ +void LocationCache::CacheLocation(const hbase::pb::TableName &tn, + const std::shared_ptr<RegionLocation> loc) { + auto t_locs = this->GetTableLocations(tn); + std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_); + + (*t_locs)[loc->region_info().start_key()] = loc; + VLOG(1) << "Cached location for region:" << loc->DebugString(); +} + +// must hold shared lock on locations_lock_ +bool LocationCache::IsLocationCached(const hbase::pb::TableName &tn, const std::string &row) { + return (this->GetCachedLocation(tn, row) != nullptr); +} + +// shared lock needed for cases when this table has been requested before; +// in the rare case it hasn't, unique lock will be grabbed to add it to cache +std::shared_ptr<hbase::PerTableLocationMap> LocationCache::GetTableLocations( + const hbase::pb::TableName &tn) { + auto found_locs = this->GetCachedTableLocations(tn); + if (found_locs == nullptr) { + found_locs = this->GetNewTableLocations(tn); + } + return found_locs; +} + +std::shared_ptr<hbase::PerTableLocationMap> LocationCache::GetCachedTableLocations( + const hbase::pb::TableName &tn) { + folly::SharedMutexWritePriority::ReadHolder r_holder{locations_lock_}; + + auto table_locs = cached_locations_.find(tn); + if (table_locs != cached_locations_.end()) { + return table_locs->second; + } else { + return nullptr; + } +} + +std::shared_ptr<hbase::PerTableLocationMap> LocationCache::GetNewTableLocations( + const hbase::pb::TableName &tn) { + // double-check locking under upgradable lock + folly::SharedMutexWritePriority::UpgradeHolder u_holder{locations_lock_}; + + auto table_locs = cached_locations_.find(tn); + if (table_locs != cached_locations_.end()) { + return table_locs->second; + } + folly::SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)}; + + auto t_locs_p = std::make_shared<std::map<std::string, std::shared_ptr<RegionLocation>>>(); + cached_locations_.insert(std::make_pair(tn, t_locs_p)); + return t_locs_p; +} + +// must hold unique lock on locations_lock_ +void LocationCache::ClearCache() { + std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_); + cached_locations_.clear(); +} + +// must hold unique lock on locations_lock_ +void LocationCache::ClearCachedLocations(const hbase::pb::TableName &tn) { + VLOG(1) << "ClearCachedLocations, table:" << folly::to<std::string>(tn); + std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_); + cached_locations_.erase(tn); + if (MetaUtil::IsMeta(tn)) { + InvalidateMeta(); + } +} + +// must hold unique lock on locations_lock_ +void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row) { + VLOG(1) << "ClearCachedLocation, table:" << folly::to<std::string>(tn) << ", row:" << row; + auto table_locs = this->GetTableLocations(tn); + std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_); + table_locs->erase(row); + if (MetaUtil::IsMeta(tn)) { + InvalidateMeta(); + } +} + +void LocationCache::UpdateCachedLocation(const RegionLocation &loc, + const folly::exception_wrapper &error) { + // TODO: just clears the location for now. We can inspect RegionMovedExceptions, etc later. + ClearCachedLocation(loc.region_info().table_name(), loc.region_info().start_key()); +} +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/meta-utils.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/meta-utils.cc b/hbase-native-client/src/hbase/client/meta-utils.cc new file mode 100644 index 0000000..338c43e --- /dev/null +++ b/hbase-native-client/src/hbase/client/meta-utils.cc @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/meta-utils.h" + +#include <folly/Conv.h> +#include <memory> +#include <utility> +#include <vector> + +#include "hbase/connection/request.h" +#include "hbase/connection/response.h" +#include "hbase/client/response-converter.h" +#include "hbase/exceptions/exception.h" +#include "hbase/if/Client.pb.h" +#include "hbase/serde/region-info.h" +#include "hbase/serde/server-name.h" +#include "hbase/serde/table-name.h" + +using hbase::pb::TableName; +using hbase::pb::RegionInfo; +using hbase::pb::RegionSpecifier_RegionSpecifierType; +using hbase::pb::ScanRequest; +using hbase::pb::ServerName; + +namespace hbase { + +MetaUtil::MetaUtil() { + meta_region_info_.set_start_key(""); + meta_region_info_.set_end_key(""); + meta_region_info_.set_offline(false); + meta_region_info_.set_split(false); + meta_region_info_.set_replica_id(0); + meta_region_info_.set_split(false); + meta_region_info_.set_region_id(1); + meta_region_info_.mutable_table_name()->set_namespace_(MetaUtil::kSystemNamespace); + meta_region_info_.mutable_table_name()->set_qualifier(MetaUtil::kMetaTableQualifier); +} + +std::string MetaUtil::RegionLookupRowkey(const TableName &tn, const std::string &row) const { + return folly::to<std::string>(tn, ",", row, ",", "999999999999999999"); +} + +std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn, const std::string &row) const { + auto request = Request::scan(); + auto msg = std::static_pointer_cast<ScanRequest>(request->req_msg()); + + msg->set_number_of_rows(1); + msg->set_close_scanner(true); + + // Set the region this scan goes to + auto region = msg->mutable_region(); + region->set_value(MetaUtil::kMetaRegion); + region->set_type( + RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME); + + auto scan = msg->mutable_scan(); + // We don't care about before, just now. + scan->set_max_versions(1); + // Meta should be cached at all times. + scan->set_cache_blocks(true); + // We only want one row right now. + // + // TODO(eclark): Figure out if we should get more. + scan->set_caching(1); + // Close the scan after we have data. + scan->set_small(true); + // We know where to start but not where to end. + scan->set_reversed(true); + // Give me everything or nothing. + scan->set_allow_partial_results(false); + + // Set the columns that we need + auto info_col = scan->add_column(); + info_col->set_family(MetaUtil::kCatalogFamily); + info_col->add_qualifier(MetaUtil::kServerColumn); + info_col->add_qualifier(MetaUtil::kRegionInfoColumn); + + scan->set_start_row(RegionLookupRowkey(tn, row)); + return request; +} + +std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp, + const TableName &tn) { + std::vector<std::shared_ptr<Result>> results = ResponseConverter::FromScanResponse(resp); + if (results.size() == 0) { + throw TableNotFoundException(folly::to<std::string>(tn)); + } + if (results.size() != 1) { + throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:" + + std::to_string(results.size())); + } + auto result = *results[0]; + + auto region_info_str = result.Value(MetaUtil::kCatalogFamily, MetaUtil::kRegionInfoColumn); + auto server_str = result.Value(MetaUtil::kCatalogFamily, MetaUtil::kServerColumn); + CHECK(region_info_str); + CHECK(server_str); + + auto row = result.Row(); + auto region_info = folly::to<RegionInfo>(*region_info_str); + auto server_name = folly::to<ServerName>(*server_str); + return std::make_shared<RegionLocation>(row, std::move(region_info), server_name); +} + +bool MetaUtil::IsMeta(const hbase::pb::TableName &tn) { + return folly::to<std::string>(tn) == MetaUtil::kMetaTableName; +} +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/multi-response.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/multi-response.cc b/hbase-native-client/src/hbase/client/multi-response.cc new file mode 100644 index 0000000..564e356 --- /dev/null +++ b/hbase-native-client/src/hbase/client/multi-response.cc @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/multi-response.h" +#include <glog/logging.h> +#include "hbase/client/region-result.h" + +using hbase::pb::RegionLoadStats; + +namespace hbase { + +MultiResponse::MultiResponse() {} + +int MultiResponse::Size() const { + int size = 0; + for (const auto& result : results_) { + size += result.second->ResultOrExceptionSize(); + } + return size; +} + +void MultiResponse::AddRegionResult(const std::string& region_name, int32_t original_index, + std::shared_ptr<Result> result, + std::shared_ptr<folly::exception_wrapper> exc) { + auto itr = results_.find(region_name); + if (itr == results_.end()) { + auto region_result = std::make_shared<RegionResult>(); + region_result->AddResultOrException(original_index, result, exc); + results_[region_name] = region_result; + } else { + itr->second->AddResultOrException(original_index, result, exc); + } +} + +void MultiResponse::AddRegionException(const std::string& region_name, + std::shared_ptr<folly::exception_wrapper> exception) { + VLOG(8) << "Store Region Exception:- " << exception->what() << "; Region[" << region_name << "];"; + bool region_found = false; + auto itr = exceptions_.find(region_name); + if (itr == exceptions_.end()) { + auto region_result = std::make_shared<folly::exception_wrapper>(); + exceptions_[region_name] = exception; + } else { + itr->second = exception; + } +} + +std::shared_ptr<folly::exception_wrapper> MultiResponse::RegionException( + const std::string& region_name) const { + auto find = exceptions_.at(region_name); + return find; +} + +const std::map<std::string, std::shared_ptr<folly::exception_wrapper> >& +MultiResponse::RegionExceptions() const { + return exceptions_; +} + +void MultiResponse::AddStatistic(const std::string& region_name, + std::shared_ptr<RegionLoadStats> stat) { + results_[region_name]->set_stat(stat); +} + +const std::map<std::string, std::shared_ptr<RegionResult> >& MultiResponse::RegionResults() const { + return results_; +} + +MultiResponse::~MultiResponse() {} + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/mutation.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/mutation.cc b/hbase-native-client/src/hbase/client/mutation.cc new file mode 100644 index 0000000..5a43b2f --- /dev/null +++ b/hbase-native-client/src/hbase/client/mutation.cc @@ -0,0 +1,69 @@ + + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/mutation.h" +#include <algorithm> +#include <limits> +#include <stdexcept> + +namespace hbase { + +Mutation::Mutation(const std::string &row) : Row(row) {} +Mutation::Mutation(const std::string &row, int64_t timestamp) : Row(row), timestamp_(timestamp) {} + +Mutation::Mutation(const Mutation &mutation) { + row_ = mutation.row_; + durability_ = mutation.durability_; + timestamp_ = mutation.timestamp_; + for (auto const &e : mutation.family_map_) { + for (auto const &c : e.second) { + family_map_[e.first].push_back(std::make_unique<Cell>(*c)); + } + } +} + +Mutation &Mutation::operator=(const Mutation &mutation) { + row_ = mutation.row_; + durability_ = mutation.durability_; + timestamp_ = mutation.timestamp_; + for (auto const &e : mutation.family_map_) { + for (auto const &c : e.second) { + family_map_[e.first].push_back(std::make_unique<Cell>(*c)); + } + } + return *this; +} + +pb::MutationProto_Durability Mutation::Durability() const { return durability_; } + +Mutation &Mutation::SetDurability(pb::MutationProto_Durability durability) { + durability_ = durability; + return *this; +} + +bool Mutation::HasFamilies() const { return !family_map_.empty(); } + +std::unique_ptr<Cell> Mutation::CreateCell(const std::string &family, const std::string &qualifier, + int64_t timestamp, const std::string &value) { + return std::make_unique<Cell>(row_, family, qualifier, timestamp, value, hbase::CellType::PUT); +} + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/put-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/put-test.cc b/hbase-native-client/src/hbase/client/put-test.cc new file mode 100644 index 0000000..0657bb6 --- /dev/null +++ b/hbase-native-client/src/hbase/client/put-test.cc @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "hbase/client/mutation.h" +#include "hbase/client/put.h" +#include "hbase/utils/time-util.h" + +using hbase::Put; +using hbase::Cell; +using hbase::CellType; +using hbase::Mutation; +using hbase::TimeUtil; + +const constexpr int64_t Mutation::kLatestTimestamp; + +TEST(Put, Row) { + Put put{"foo"}; + EXPECT_EQ("foo", put.row()); +} + +TEST(Put, Durability) { + Put put{"row"}; + EXPECT_EQ(hbase::pb::MutationProto_Durability_USE_DEFAULT, put.Durability()); + + auto skipWal = hbase::pb::MutationProto_Durability_SKIP_WAL; + put.SetDurability(skipWal); + EXPECT_EQ(skipWal, put.Durability()); +} + +TEST(Put, Timestamp) { + Put put{"row"}; + + // test default timestamp + EXPECT_EQ(Mutation::kLatestTimestamp, put.TimeStamp()); + + // set custom timestamp + auto ts = TimeUtil::ToMillis(TimeUtil::GetNowNanos()); + put.SetTimeStamp(ts); + EXPECT_EQ(ts, put.TimeStamp()); + + // Add a column with custom timestamp + put.AddColumn("f", "q", "v"); + auto &cell = put.FamilyMap().at("f")[0]; + EXPECT_EQ(ts, cell->Timestamp()); +} + +TEST(Put, HasFamilies) { + Put put{"row"}; + + EXPECT_EQ(false, put.HasFamilies()); + + put.AddColumn("f", "q", "v"); + EXPECT_EQ(true, put.HasFamilies()); +} + +TEST(Put, Add) { + CellType cell_type = CellType::PUT; + std::string row = "row"; + std::string family = "family"; + std::string column = "column"; + std::string value = "value"; + int64_t timestamp = std::numeric_limits<int64_t>::max(); + auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); + + // add first cell + Put put{"row"}; + put.Add(std::move(cell)); + EXPECT_EQ(1, put.FamilyMap().size()); + EXPECT_EQ(1, put.FamilyMap().at(family).size()); + + // add a non-matching row + auto cell2 = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); + Put put2{"foo"}; + ASSERT_THROW(put2.Add(std::move(cell2)), std::runtime_error); // rows don't match + + // add a second cell with same family + auto cell3 = std::make_unique<Cell>(row, family, "column-2", timestamp, value, cell_type); + put.Add(std::move(cell3)); + EXPECT_EQ(1, put.FamilyMap().size()); + EXPECT_EQ(2, put.FamilyMap().at(family).size()); + + // add a cell to a different family + auto cell4 = std::make_unique<Cell>(row, "family-2", "column-2", timestamp, value, cell_type); + put.Add(std::move(cell4)); + EXPECT_EQ(2, put.FamilyMap().size()); + EXPECT_EQ(1, put.FamilyMap().at("family-2").size()); +} + +TEST(Put, AddColumn) { + std::string row = "row"; + std::string family = "family"; + std::string column = "column"; + std::string value = "value"; + + Put put{"row"}; + put.AddColumn(family, column, value); + EXPECT_EQ(1, put.FamilyMap().size()); + EXPECT_EQ(1, put.FamilyMap().at(family).size()); + + // add a second cell with same family + put.AddColumn(family, "column-2", value); + EXPECT_EQ(1, put.FamilyMap().size()); + EXPECT_EQ(2, put.FamilyMap().at(family).size()); + + // add a cell to a different family + put.AddColumn("family-2", column, value); + EXPECT_EQ(2, put.FamilyMap().size()); + EXPECT_EQ(1, put.FamilyMap().at("family-2").size()); + + // use the AddColumn overload + auto ts = TimeUtil::ToMillis(TimeUtil::GetNowNanos()); + put.AddColumn(family, column, ts, value); + EXPECT_EQ(2, put.FamilyMap().size()); + EXPECT_EQ(3, put.FamilyMap().at(family).size()); + auto &cell = put.FamilyMap().at(family)[2]; + EXPECT_EQ(ts, cell->Timestamp()); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/put.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/client/put.cc b/hbase-native-client/src/hbase/client/put.cc new file mode 100644 index 0000000..d942d3d --- /dev/null +++ b/hbase-native-client/src/hbase/client/put.cc @@ -0,0 +1,68 @@ + + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "hbase/client/put.h" +#include <folly/Conv.h> +#include <algorithm> +#include <limits> +#include <stdexcept> +#include <utility> + +namespace hbase { + +/** + * @brief Add the specified column and value to this Put operation. + * @param family family name + * @param qualifier column qualifier + * @param value column value + */ +Put& Put::AddColumn(const std::string& family, const std::string& qualifier, + const std::string& value) { + return AddColumn(family, qualifier, timestamp_, value); +} + +/** + * @brief Add the specified column and value to this Put operation. + * @param family family name + * @param qualifier column qualifier + * @param timestamp version timestamp + * @param value column value + */ +Put& Put::AddColumn(const std::string& family, const std::string& qualifier, int64_t timestamp, + const std::string& value) { + if (timestamp < 0) { + throw std::runtime_error("Timestamp cannot be negative. ts=" + + folly::to<std::string>(timestamp)); + } + + return Add(CreateCell(family, qualifier, timestamp, value)); +} + +Put& Put::Add(std::unique_ptr<Cell> cell) { + if (cell->Row() != row_) { + throw std::runtime_error("The row in " + cell->DebugString() + + " doesn't match the original one " + row_); + } + + family_map_[cell->Family()].push_back(std::move(cell)); + return *this; +} +} // namespace hbase