Repository: hbase Updated Branches: refs/heads/HBASE-14850 9786a07ee -> e5643e863
HBASE-18537 [C++] Improvements to load-client Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5643e86 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5643e86 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5643e86 Branch: refs/heads/HBASE-14850 Commit: e5643e8632f8d4c916f1e1ca9537612b7370ace1 Parents: 9786a07 Author: Enis Soztutar <e...@apache.org> Authored: Fri Aug 11 11:09:34 2017 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Fri Aug 11 11:09:34 2017 -0700 ---------------------------------------------------------------------- hbase-native-client/core/load-client.cc | 213 +++++++++++++++---------- hbase-native-client/core/simple-client.cc | 4 +- 2 files changed, 135 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e5643e86/hbase-native-client/core/load-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/load-client.cc b/hbase-native-client/core/load-client.cc index 67f0d57..8cceeef 100644 --- a/hbase-native-client/core/load-client.cc +++ b/hbase-native-client/core/load-client.cc @@ -63,16 +63,17 @@ static constexpr const char *appendPrefix = "a"; std::string PrefixZero(int total_width, int num) { std::string str = std::to_string(num); - auto prefix_len = total_width - str.length(); - if (prefix_len > 0) return std::string(total_width - str.length(), '0') + str; + int prefix_len = total_width - str.length(); + if (prefix_len > 0) { + return std::string(prefix_len, '0') + str; + } return str; } bool Verify(std::shared_ptr<hbase::Result> result, std::string family, int m) { auto col = std::to_string(m); - auto int_val = hbase::BytesUtil::ToInt64(*(result->Value(family, incrPrefix + col))); - if (int_val != m) { - LOG(ERROR) << "value is not " << col << " for " << result->Row(); + if (!result->Value(family, col)) { + LOG(ERROR) << "Column:" << col << " is not found for " << result->Row(); return false; } auto l = *(result->Value(family, col)); @@ -80,11 +81,52 @@ bool Verify(std::shared_ptr<hbase::Result> result, std::string family, int m) { LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col; return false; } - l = *(result->Value(family, appendPrefix + col)); - if (l != col) { - LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col; + if (FLAGS_appends) { + if (!result->Value(family, incrPrefix + col)) { + LOG(ERROR) << "Column:" << (incrPrefix + col) << " is not found for " << result->Row(); + return false; + } + auto int_val = hbase::BytesUtil::ToInt64(*(result->Value(family, incrPrefix + col))); + if (int_val != m) { + LOG(ERROR) << "value is not " << col << " for " << result->Row(); + return false; + } + if (!result->Value(family, appendPrefix + col)) { + LOG(ERROR) << "Column:" << (appendPrefix + col) << " is not found for " << result->Row(); + return false; + } + l = *(result->Value(family, appendPrefix + col)); + if (l != col) { + LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col; + return false; + } + } + + return true; +} + +bool Verify(std::shared_ptr<hbase::Result> result, const std::string &row, + const std::vector<std::string> &families) { + if (result == nullptr || result->IsEmpty()) { + LOG(ERROR) << "didn't get result"; return false; } + if (result->Row().compare(row) != 0) { + LOG(ERROR) << "row " << result->Row() << " is not the expected: " << row; + return false; + } + // Test the values + for (auto family : families) { + if (!result->Value(family, kNumColumn)) { + LOG(ERROR) << "Column:" << kNumColumn << " is not found for " << result->Row(); + return false; + } + auto cols = std::stoi(*(result->Value(family, kNumColumn))); + VLOG(3) << "Result for row:" << row << " contains " << std::to_string(cols) << " columns"; + for (int m = 1; m <= cols; m++) { + if (!Verify(result, family, m)) return false; + } + } return true; } @@ -95,35 +137,46 @@ bool DoScan(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Tabl auto end = start + rows; auto width = std::to_string(max_row).length(); scan.SetStartRow(PrefixZero(width, start)); - scan.SetStopRow(PrefixZero(width, end)); + if (end != max_row && end != max_row + 1) { + scan.SetStopRow(PrefixZero(width, end)); + } + + auto start_ns = TimeUtil::GetNowNanos(); auto scanner = table->Scan(scan); - auto cnt = start; + auto cnt = 0; auto r = scanner->Next(); while (r != nullptr) { - auto row = PrefixZero(width, cnt); - if (r->Row().compare(row) != 0) { - LOG(ERROR) << "row " << r->Row() << " is not the expected: " << row; + auto row = PrefixZero(width, start + cnt); + if (!Verify(r, row, families)) { return false; } - for (auto family : families) { - auto cols = std::stoi(*(r->Value(family, kNumColumn))); - VLOG(3) << "scan gets " << std::to_string(cols) << " columns"; - for (int m = 1; m <= cols; m++) { - if (!Verify(r, family, m)) return false; - } - } cnt++; r = scanner->Next(); + if (cnt != 0 && cnt % FLAGS_report_num_rows == 0) { + LOG(INFO) << "(Thread " << iteration << ") " + << "Scan iterated over " << cnt << " results in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } + } + if (cnt != rows) { + LOG(ERROR) << "(Thread " << iteration << ") " + << "Expected number of results does not match. expected:" << rows + << ", actual:" << cnt; + return false; } - LOG(INFO) << "Iteration " << iteration << " scanned " << std::to_string(cnt - start) << " rows"; + LOG(INFO) << "(Thread " << iteration << ") " + << "scanned " << std::to_string(cnt) << " rows in " << TimeUtil::ElapsedMillis(start_ns) + << " ms."; return true; } bool DoGet(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Table> table, - const std::vector<std::string> &families, int batch_num_rows) { + const std::vector<std::string> &families, uint64_t batch_num_rows) { auto width = std::to_string(max_row).length(); + auto start_ns = TimeUtil::GetNowNanos(); for (uint64_t k = iteration; k <= max_row;) { + uint64_t total_read = 0; std::vector<hbase::Get> gets; for (uint64_t i = 0; i < batch_num_rows && k <= max_row; ++i, k += FLAGS_threads) { std::string row = PrefixZero(width, k); @@ -132,29 +185,34 @@ bool DoGet(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Table } VLOG(3) << "getting for " << batch_num_rows << " rows"; auto results = table->Get(gets); + if (results.size() != gets.size()) { + LOG(ERROR) << "(Thread " << iteration << ") " + << "Expected number of results does not match. expected:" << gets.size() + << ", actual:" << results.size(); + return false; + } for (uint64_t i = 0; i < batch_num_rows && i < results.size(); ++i) { - auto result = results[i]; - if (result == nullptr) { - LOG(ERROR) << "didn't get result"; + if (!Verify(results[i], gets[i].row(), families)) { return false; } - // Test the values - for (auto family : families) { - auto cols = std::stoi(*(result->Value(family, kNumColumn))); - VLOG(3) << "gets " << std::to_string(cols) << " columns"; - for (int m = 1; m <= cols; m++) { - if (!Verify(result, family, m)) return false; - } - } + } + total_read += gets.size(); + if (total_read != 0 && total_read % FLAGS_report_num_rows == 0) { + LOG(INFO) << "(Thread " << iteration << ") " + << "Sent " << total_read << " Multi-Get requests in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; } k += batch_num_rows; } - LOG(INFO) << "Sent " << rows << " gets in iteration " << iteration; + LOG(INFO) << "(Thread " << iteration << ") " + << "Sent " << rows << " gets" + << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; return true; } void DoPut(int iteration, uint64_t max_row, uint64_t rows, int cols, std::unique_ptr<Table> table, const std::vector<std::string> &families) { + auto start_ns = TimeUtil::GetNowNanos(); auto width = std::to_string(max_row).length(); for (uint64_t j = 0; j < rows; j++) { std::string row = PrefixZero(width, iteration * rows + j); @@ -167,15 +225,20 @@ void DoPut(int iteration, uint64_t max_row, uint64_t rows, int cols, std::unique } } table->Put(put); - if ((j + 1) % FLAGS_report_num_rows == 0) - LOG(INFO) << "Written " << std::to_string(j + 1) << " rows"; + if ((j + 1) % FLAGS_report_num_rows == 0) { + LOG(INFO) << "(Thread " << iteration << ") " + << "Written " << std::to_string(j + 1) << " rows in " + << TimeUtil::ElapsedMillis(start_ns) << " ms."; + } } - LOG(INFO) << "written " << std::to_string(rows) << " rows in " << std::to_string(iteration) - << " iteration"; + LOG(INFO) << "(Thread " << iteration << ") " + << "written " << std::to_string(rows) << " rows" + << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; } bool DoAppendIncrement(int iteration, uint64_t max_row, uint64_t rows, int cols, std::unique_ptr<Table> table, const std::vector<std::string> &families) { + auto start_ns = TimeUtil::GetNowNanos(); auto width = std::to_string(max_row).length(); for (uint64_t j = 0; j < rows; j++) { std::string row = PrefixZero(width, iteration * rows + j); @@ -188,20 +251,26 @@ bool DoAppendIncrement(int iteration, uint64_t max_row, uint64_t rows, int cols, hbase::Increment{row}.AddColumn(family, incrPrefix + std::to_string(k), k)); if (!table->Append(hbase::Append{row}.Add(family, appendPrefix + std::to_string(k), std::to_string(k)))) { - LOG(ERROR) << "append for " << row << " family: " << family << " failed"; + LOG(ERROR) << "(Thread " << iteration << ") " + << "append for " << row << " family: " << family << " failed"; return false; } } } if ((j + 1) % FLAGS_report_num_rows == 0) - LOG(INFO) << "Written " << std::to_string(j + 1) << " increments"; + LOG(INFO) << "(Thread " << iteration << ") " + << "Written " << std::to_string(j + 1) << " increments" + << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; } + LOG(INFO) << "(Thread " << iteration << ") " + << "written " << std::to_string(rows) << " increments" + << " in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; return true; } int main(int argc, char *argv[]) { - google::SetUsageMessage("Load client to manipulate multiple rows from HBase on the comamnd line"); - google::ParseCommandLineFlags(&argc, &argv, true); + gflags::SetUsageMessage("Load client to manipulate multiple rows from HBase on the comamnd line"); + gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); google::InstallFailureSignalHandler(); FLAGS_logtostderr = 1; @@ -244,94 +313,78 @@ int main(int argc, char *argv[]) { int rows = FLAGS_num_rows / FLAGS_threads; if (FLAGS_num_rows % FLAGS_threads != 0) rows++; int cols = FLAGS_num_cols; - bool succeeded = true; + std::atomic<int8_t> succeeded{1}; // not using bool since we want atomic &= if (FLAGS_puts) { + LOG(INFO) << "Sending put requests"; auto start_ns = TimeUtil::GetNowNanos(); std::vector<std::thread> writer_threads; for (int i = 0; i < FLAGS_threads; i++) { - writer_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] { - // Get connection to HBase Table + writer_threads.push_back(std::thread([&, i] { auto table = client->Table(*tn); - DoPut(i, FLAGS_num_rows - 1, rows, cols, std::move(table), families); - table->Close(); })); } - for (std::vector<std::thread>::iterator it = writer_threads.begin(); it != writer_threads.end(); - it++) { - std::thread thread = std::move(*it); - thread.join(); + for (auto &t : writer_threads) { + t.join(); } LOG(INFO) << "Successfully sent " << num_puts << " Put requests in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; } if (FLAGS_appends) { + LOG(INFO) << "Sending append/increment requests"; auto start_ns = TimeUtil::GetNowNanos(); std::vector<std::thread> writer_threads; for (int i = 0; i < FLAGS_threads; i++) { - writer_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] { - // Get connection to HBase Table + writer_threads.push_back(std::thread([&, i] { auto table = client->Table(*tn); - succeeded &= DoAppendIncrement(i, FLAGS_num_rows - 1, rows, cols, std::move(table), families); - table->Close(); })); } - for (std::vector<std::thread>::iterator it = writer_threads.begin(); it != writer_threads.end(); - it++) { - std::thread thread = std::move(*it); - thread.join(); + for (auto &t : writer_threads) { + t.join(); } LOG(INFO) << "Successfully sent " << num_puts << " append requests in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; } if (FLAGS_scans) { + LOG(INFO) << "Sending scan requests"; auto start_ns = TimeUtil::GetNowNanos(); std::vector<std::thread> reader_threads; for (int i = 0; i < FLAGS_threads; i++) { - reader_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] { - // Get connection to HBase Table + reader_threads.push_back(std::thread([&, i] { auto table1 = client->Table(*tn); - succeeded &= DoScan(i, FLAGS_num_rows - 1, rows, std::move(table1), families); - table1->Close(); })); } - for (std::vector<std::thread>::iterator it = reader_threads.begin(); it != reader_threads.end(); - it++) { - std::thread thread = std::move(*it); - thread.join(); + for (auto &t : reader_threads) { + t.join(); } - LOG(INFO) << (succeeded ? "Successful. " : "Failed. ") << "Spent " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; + LOG(INFO) << (succeeded.load() ? "Successfully " : "Failed. ") << " scannned " << num_puts + << " rows in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; } if (FLAGS_gets) { + LOG(INFO) << "Sending get requests"; auto start_ns = TimeUtil::GetNowNanos(); std::vector<std::thread> reader_threads; for (int i = 0; i < FLAGS_threads; i++) { - reader_threads.push_back(std::thread([=, &client, &tn, &families, &succeeded] { - // Get connection to HBase Table + reader_threads.push_back(std::thread([&, i] { auto table1 = client->Table(*tn); - succeeded &= DoGet(i, FLAGS_num_rows - 1, rows, std::move(table1), families, FLAGS_batch_num_rows); - table1->Close(); })); } - for (std::vector<std::thread>::iterator it = reader_threads.begin(); it != reader_threads.end(); - it++) { - std::thread thread = std::move(*it); - thread.join(); + for (auto &t : reader_threads) { + t.join(); } - LOG(INFO) << (succeeded ? "Successful. " : "Failed. ") << "Spent " - << TimeUtil::ElapsedMillis(start_ns) << " ms."; + LOG(INFO) << (succeeded.load() ? "Successful. " : "Failed. ") << " sent multi-get requests for " + << num_puts << " rows in " << TimeUtil::ElapsedMillis(start_ns) << " ms."; } client->Close(); - return succeeded ? 0 : -1; + return succeeded.load() ? 0 : -1; } http://git-wip-us.apache.org/repos/asf/hbase/blob/e5643e86/hbase-native-client/core/simple-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index d36689e..6730248 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -82,8 +82,8 @@ void ValidateResult(const Result &result, const std::string &row) { } int main(int argc, char *argv[]) { - google::SetUsageMessage("Simple client to get a single row from HBase on the comamnd line"); - google::ParseCommandLineFlags(&argc, &argv, true); + gflags::SetUsageMessage("Simple client to get a single row from HBase on the comamnd line"); + gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); google::InstallFailureSignalHandler(); FLAGS_logtostderr = 1;