http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/rpc-test-server.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/connection/rpc-test-server.cc b/hbase-native-client/src/hbase/connection/rpc-test-server.cc new file mode 100644 index 0000000..337266e --- /dev/null +++ b/hbase-native-client/src/hbase/connection/rpc-test-server.cc @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <wangle/channel/AsyncSocketHandler.h> +#include <wangle/channel/EventBaseHandler.h> +#include <wangle/codec/LengthFieldBasedFrameDecoder.h> +#include <wangle/codec/LengthFieldPrepender.h> +#include <wangle/service/ServerDispatcher.h> + +#include "hbase/connection/rpc-test-server-handler.h" +#include "hbase/connection/rpc-test-server.h" +#include "hbase/if/test.pb.h" + +namespace hbase { + +RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline( + std::shared_ptr<AsyncTransportWrapper> sock) { + if (service_ == nullptr) { + initService(sock); + } + CHECK(service_ != nullptr); + + auto pipeline = RpcTestServerSerializePipeline::create(); + pipeline->addBack(AsyncSocketHandler(sock)); + // ensure we can write from any thread + pipeline->addBack(EventBaseHandler()); + pipeline->addBack(LengthFieldBasedFrameDecoder()); + pipeline->addBack(RpcTestServerSerializeHandler()); + pipeline->addBack(MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>( + service_.get())); + pipeline->finalize(); + + return pipeline; +} + +void RpcTestServerPipelineFactory::initService(std::shared_ptr<AsyncTransportWrapper> sock) { + /* get server address */ + SocketAddress localAddress; + sock->getLocalAddress(&localAddress); + + /* init service with server address */ + service_ = std::make_shared<ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>>>( + std::make_shared<CPUThreadPoolExecutor>(1), + std::make_shared<RpcTestService>(std::make_shared<SocketAddress>(localAddress))); +} + +Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Request> request) { + /* build Response */ + auto response = std::make_unique<Response>(); + response->set_call_id(request->call_id()); + std::string method_name = request->method(); + + if (method_name == "ping") { + auto pb_resp_msg = std::make_shared<EmptyResponseProto>(); + response->set_resp_msg(pb_resp_msg); + VLOG(1) << "RPC server:" + << " ping called."; + + } else if (method_name == "echo") { + auto pb_resp_msg = std::make_shared<EchoResponseProto>(); + /* get msg from client */ + auto pb_req_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg()); + pb_resp_msg->set_message(pb_req_msg->message()); + response->set_resp_msg(pb_resp_msg); + VLOG(1) << "RPC server:" + << " echo called, " << pb_req_msg->message(); + + } else if (method_name == "error") { + auto pb_resp_msg = std::make_shared<EmptyResponseProto>(); + response->set_resp_msg(pb_resp_msg); + VLOG(1) << "RPC server:" + << " error called."; + response->set_exception(RpcTestException("server error!")); + + } else if (method_name == "pause") { + auto pb_resp_msg = std::make_shared<EmptyResponseProto>(); + /* sleeping */ + auto pb_req_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg()); + std::this_thread::sleep_for(std::chrono::milliseconds(pb_req_msg->ms())); + response->set_resp_msg(pb_resp_msg); + VLOG(1) << "RPC server:" + << " pause called, " << pb_req_msg->ms() << " ms"; + + } else if (method_name == "addr") { + // TODO: + } else if (method_name == "socketNotOpen") { + auto pb_resp_msg = std::make_shared<EmptyResponseProto>(); + response->set_resp_msg(pb_resp_msg); + } + + return folly::makeFuture<std::unique_ptr<Response>>(std::move(response)); +} +} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/rpc-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/connection/rpc-test.cc b/hbase-native-client/src/hbase/connection/rpc-test.cc new file mode 100644 index 0000000..64f8f99 --- /dev/null +++ b/hbase-native-client/src/hbase/connection/rpc-test.cc @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <wangle/bootstrap/ClientBootstrap.h> +#include <wangle/channel/Handler.h> + +#include <folly/Format.h> +#include <folly/Logging.h> +#include <folly/SocketAddress.h> +#include <folly/String.h> +#include <folly/experimental/TestUtil.h> +#include <folly/io/async/AsyncSocketException.h> +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <boost/thread.hpp> +#include <chrono> + +#include "hbase/connection/rpc-client.h" +#include "hbase/exceptions/exception.h" +#include "hbase/if/test.pb.h" +#include "hbase/connection/rpc-test-server.h" +#include "hbase/security/user.h" +#include "hbase/serde/rpc-serde.h" + +using namespace wangle; +using namespace folly; +using namespace hbase; +using namespace std::chrono; + +DEFINE_int32(port, 0, "test server port"); +DEFINE_string(result_format, "RPC {} returned: {}.", "output format of RPC result"); +DEFINE_string(fail_ex_format, "Shouldn't get here, exception is expected for RPC {}.", + "output format of enforcing fail with exception"); +DEFINE_string(fail_no_ex_format, "Shouldn't get here, exception is not expected for RPC {}.", + "output format of enforcing fail without exception"); +typedef ServerBootstrap<RpcTestServerSerializePipeline> ServerTestBootstrap; +typedef std::shared_ptr<ServerTestBootstrap> ServerPtr; + +class RpcTest : public ::testing::Test { + public: + static void SetUpTestCase() { google::InstallFailureSignalHandler(); } +}; + +std::shared_ptr<Configuration> CreateConf() { + auto conf = std::make_shared<Configuration>(); + conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true"); + return conf; +} + +ServerPtr CreateRpcServer() { + /* create rpc test server */ + auto server = std::make_shared<ServerTestBootstrap>(); + server->childPipeline(std::make_shared<RpcTestServerPipelineFactory>()); + server->bind(FLAGS_port); + return server; +} + +std::shared_ptr<folly::SocketAddress> GetRpcServerAddress(ServerPtr server) { + auto addr = std::make_shared<folly::SocketAddress>(); + server->getSockets()[0]->getAddress(addr.get()); + return addr; +} + +std::shared_ptr<RpcClient> CreateRpcClient(std::shared_ptr<Configuration> conf) { + auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1); + auto cpu_executor = std::make_shared<wangle::CPUThreadPoolExecutor>(1); + auto client = std::make_shared<RpcClient>(io_executor, cpu_executor, nullptr, conf); + return client; +} + +std::shared_ptr<RpcClient> CreateRpcClient(std::shared_ptr<Configuration> conf, + std::chrono::nanoseconds connect_timeout) { + auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1); + auto cpu_executor = std::make_shared<wangle::CPUThreadPoolExecutor>(1); + auto client = + std::make_shared<RpcClient>(io_executor, cpu_executor, nullptr, conf, connect_timeout); + return client; +} + +/** +* test ping +*/ +TEST_F(RpcTest, Ping) { + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = CreateRpcClient(conf); + + auto method = "ping"; + auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), + std::make_shared<EmptyResponseProto>(), method); + + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([&](std::unique_ptr<Response> response) { + auto pb_resp = std::static_pointer_cast<EmptyResponseProto>(response->resp_msg()); + EXPECT_TRUE(pb_resp != nullptr); + VLOG(1) << folly::sformat(FLAGS_result_format, method, ""); + }) + .onError([&](const folly::exception_wrapper& ew) { + FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); + }) + .get(); + + server->stop(); + server->join(); +} + +/** + * test echo + */ +TEST_F(RpcTest, Echo) { + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = CreateRpcClient(conf); + + auto method = "echo"; + auto greetings = "hello, hbase server!"; + auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(), + std::make_shared<EchoResponseProto>(), method); + auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg()); + pb_msg->set_message(greetings); + + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([&](std::unique_ptr<Response> response) { + auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg()); + EXPECT_TRUE(pb_resp != nullptr); + VLOG(1) << folly::sformat(FLAGS_result_format, method, pb_resp->message()); + EXPECT_EQ(greetings, pb_resp->message()); + }) + .onError([&](const folly::exception_wrapper& ew) { + FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); + }) + .get(); + + server->stop(); + server->join(); +} + +/** + * test error + */ +TEST_F(RpcTest, Error) { + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = CreateRpcClient(conf); + + auto method = "error"; + auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), + std::make_shared<EmptyResponseProto>(), method); + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([&](std::unique_ptr<Response> response) { + FAIL() << folly::sformat(FLAGS_fail_ex_format, method); + }) + .onError([&](const folly::exception_wrapper& ew) { + VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); + std::string kRemoteException = demangle(typeid(hbase::RemoteException)).toStdString(); + std::string kRpcTestException = demangle(typeid(hbase::RpcTestException)).toStdString(); + + /* verify exception_wrapper */ + EXPECT_TRUE(bool(ew)); + EXPECT_EQ(kRemoteException, ew.class_name()); + + /* verify exception */ + EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& e) { + EXPECT_EQ(kRpcTestException, e.exception_class_name()); + EXPECT_EQ(kRpcTestException + ": server error!", e.stack_trace()); + })); + }) + .get(); + + server->stop(); + server->join(); +} + +TEST_F(RpcTest, SocketNotOpen) { + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = CreateRpcClient(conf); + + auto method = "socketNotOpen"; + auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), + std::make_shared<EmptyResponseProto>(), method); + + server->stop(); + server->join(); + + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([&](std::unique_ptr<Response> response) { + FAIL() << folly::sformat(FLAGS_fail_ex_format, method); + }) + .onError([&](const folly::exception_wrapper& ew) { + VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); + std::string kConnectionException = + demangle(typeid(hbase::ConnectionException)).toStdString(); + std::string kAsyncSocketException = + demangle(typeid(folly::AsyncSocketException)).toStdString(); + + /* verify exception_wrapper */ + EXPECT_TRUE(bool(ew)); + EXPECT_EQ(kConnectionException, ew.class_name()); + + /* verify exception */ + EXPECT_TRUE(ew.with_exception([&](const hbase::ConnectionException& e) { + EXPECT_TRUE(bool(e.cause())); + EXPECT_EQ(kAsyncSocketException, e.cause().class_name()); + VLOG(1) << folly::sformat(FLAGS_result_format, method, e.cause().what()); + e.cause().with_exception([&](const folly::AsyncSocketException& ase) { + EXPECT_EQ(AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN, ase.getType()); + EXPECT_EQ(111 /*ECONNREFUSED*/, ase.getErrno()); + }); + })); + }) + .get(); +} + +/** + * test pause + */ +TEST_F(RpcTest, Pause) { + int ms = 500; + + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = + CreateRpcClient(conf, std::chrono::duration_cast<nanoseconds>(milliseconds(2 * ms))); + + auto method = "pause"; + auto request = std::make_unique<Request>(std::make_shared<PauseRequestProto>(), + std::make_shared<EmptyResponseProto>(), method); + auto pb_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg()); + + pb_msg->set_ms(ms); + + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([&](std::unique_ptr<Response> response) { + auto pb_resp = std::static_pointer_cast<EmptyResponseProto>(response->resp_msg()); + EXPECT_TRUE(pb_resp != nullptr); + VLOG(1) << folly::sformat(FLAGS_result_format, method, ""); + }) + .onError([&](const folly::exception_wrapper& ew) { + VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); + FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); + }) + .get(); + + server->stop(); + server->join(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/sasl-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/connection/sasl-handler.cc b/hbase-native-client/src/hbase/connection/sasl-handler.cc new file mode 100644 index 0000000..242665f --- /dev/null +++ b/hbase-native-client/src/hbase/connection/sasl-handler.cc @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "hbase/connection/sasl-handler.h" + +#include <glog/logging.h> +#include <sasl/sasl.h> +#include <sasl/saslplug.h> +#include <sasl/saslutil.h> + +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl.h> +#include <wangle/channel/Handler.h> + +#include <condition_variable> +#include <memory> +#include <mutex> +#include <string> +#include <utility> + +#include "hbase/connection/service.h" +#include "hbase/security/user.h" +using hbase::security::User; + +using std::chrono::nanoseconds; +using namespace folly; +using namespace wangle; +using namespace hbase; + +SaslHandler::SaslHandler(std::string user_name, std::shared_ptr<Configuration> conf) + : user_name_(user_name) { + host_name_.clear(); + secure_ = User::IsSecurityEnabled(*conf); + service_name_ = SaslUtil::ParseServiceName(conf, secure_); + sasl_connection_setup_started_.clear(); + sasl_connection_setup_in_progress_.store(true); +} + +SaslHandler::SaslHandler(const SaslHandler &hdlr) { + user_name_ = hdlr.user_name_; + service_name_ = hdlr.service_name_; + secure_ = hdlr.secure_; + host_name_ = hdlr.host_name_; + // copy-constructor sets the flags below to their initial state as opposed to getting them + // from the object this class is constructed from. That way, this instance is ready to do + // sasl stuff without issues, right from the SaslInit. Sharing a sasl session is not useful + // between two handler instances. + sasl_connection_setup_started_.clear(); + sasl_connection_setup_in_progress_.store(true); + sconn_ = nullptr; +} + +SaslHandler::~SaslHandler() { + if (nullptr != sconn_) { + sasl_dispose(&sconn_); + } + sconn_ = nullptr; +} + +void SaslHandler::transportActive(Context *ctx) { + // assign hostname; needed for the sasl handshake if secure + folly::SocketAddress address; + ctx->getTransport()->getPeerAddress(&address); + host_name_ = address.getHostStr(); + + // now init the sasl library; this is once per process + if (secure_) { + sasl_util_.InitializeSaslLib(); + } + // write the preamble to kick off the RPC handshake + VLOG(3) << "Writing RPC connection Preamble to server: " << host_name_; + auto preamble = RpcSerde::Preamble(secure_); + ctx->fireWrite(std::move(preamble)); + ctx->fireTransportActive(); +} + +void SaslHandler::read(Context *ctx, folly::IOBufQueue &buf) { + // if security is not on, or in case of security-on, if secure connection setup not in progress, + // pass it up without touching + if (!secure_ || !sasl_connection_setup_in_progress_.load()) { + ctx->fireRead(buf); + } else { + // message is for this handler; process it appropriately + ContinueSaslNegotiation(ctx, &buf); + } +} + +folly::Future<folly::Unit> SaslHandler::write(Context *ctx, std::unique_ptr<folly::IOBuf> buf) { + // if security is on, and if secure connection setup in progress, + // this message is for this handler to process and respond + if (secure_ && sasl_connection_setup_in_progress_.load()) { + // store IOBuf which is to be sent to server after SASL handshake + iobuf_.push_back(std::move(buf)); + if (!sasl_connection_setup_started_.test_and_set()) { + // for the first incoming RPC from the higher layer, trigger sasl initialization + return SaslInit(ctx); + } else { + // for the subsequent incoming RPCs from the higher layer, just return empty future + folly::Promise<folly::Unit> p_; + return p_.getFuture(); + } + } + // pass the bytes recieved down without touching it + return ctx->fireWrite(std::move(buf)); +} + +folly::Future<folly::Unit> SaslHandler::WriteSaslOutput(Context *ctx, const char *out, + unsigned int outlen) { + int buffer_size = outlen + 4; + auto iob = IOBuf::create(buffer_size); + iob->append(buffer_size); + // Create the array output stream. + google::protobuf::io::ArrayOutputStream aos{iob->writableData(), buffer_size}; + std::unique_ptr<google::protobuf::io::CodedOutputStream> coded_output = + std::make_unique<google::protobuf::io::CodedOutputStream>(&aos); + uint32_t total_size = outlen; + total_size = ntohl(total_size); + coded_output->WriteRaw(&total_size, 4); + coded_output->WriteRaw(out, outlen); + return ctx->fireWrite(std::move(iob)); +} + +void SaslHandler::FinishAuth(Context *ctx, folly::IOBufQueue *bufQueue) { + std::unique_ptr<folly::IOBuf> iob; + if (!bufQueue->empty()) { + iob = bufQueue->pop_front(); + throw std::runtime_error("Error in the final step of handshake " + + std::string(reinterpret_cast<const char *>(iob->data()))); + } else { + sasl_connection_setup_in_progress_.store(false); + // write what we buffered + for (size_t i = 0; i < iobuf_.size(); i++) { + iob = std::move(iobuf_.at(i)); + ctx->fireWrite(std::move(iob)); + } + } +} + +folly::Future<folly::Unit> SaslHandler::SaslInit(Context *ctx) { + int rc; + const char *mechusing, *mechlist = "GSSAPI"; + const char *out; + unsigned int outlen; + + rc = sasl_client_new(service_name_.c_str(), /* The service we are using*/ + host_name_.c_str(), NULL, + NULL, /* Local and remote IP address strings + (NULL disables mechanisms which require this info)*/ + NULL, /*connection-specific callbacks*/ + 0 /*security flags*/, &sconn_); + if (rc != SASL_OK) { + LOG(FATAL) << "Cannot create client (" << rc << ") "; + throw std::runtime_error("Cannot create client"); + } + int curr_rc; + do { + curr_rc = sasl_client_start(sconn_, /* the same context from above */ + mechlist, /* the list of mechanisms from the server */ + NULL, /* filled in if an interaction is needed */ + &out, /* filled in on success */ + &outlen, /* filled in on success */ + &mechusing); + } while (curr_rc == SASL_INTERACT); /* the mechanism may ask us to fill + in things many times. result is SASL_CONTINUE on success */ + if (curr_rc != SASL_CONTINUE) { + throw std::runtime_error("Cannot start client (" + std::to_string(curr_rc) + ")"); + } + folly::Future<folly::Unit> fut = WriteSaslOutput(ctx, out, outlen); + return fut; +} + +void SaslHandler::ContinueSaslNegotiation(Context *ctx, folly::IOBufQueue *bufQueue) { + const char *out; + unsigned int outlen; + + int bytes_sent = 0; + int bytes_received = 0; + + std::unique_ptr<folly::IOBuf> iob = bufQueue->pop_front(); + bytes_received = iob->length(); + if (bytes_received == 0) { + throw std::runtime_error("Error in sasl handshake"); + } + folly::io::RWPrivateCursor c(iob.get()); + std::uint32_t status = c.readBE<std::uint32_t>(); + std::uint32_t sz = c.readBE<std::uint32_t>(); + + if (status != 0 /*Status 0 is success*/) { + // Assumption here is that the response from server is not more than 8 * 1024 + throw std::runtime_error("Error in sasl handshake " + + std::string(reinterpret_cast<char *>(c.writableData()))); + } + out = nullptr; + outlen = 0; + + int curr_rc = + sasl_client_step(sconn_, /* our context */ + reinterpret_cast<char *>(c.writableData()), /* the data from the server */ + sz, /* its length */ + NULL, /* this should be unallocated and NULL */ + &out, /* filled in on success */ + &outlen); /* filled in on success */ + + if (curr_rc == SASL_OK || curr_rc == SASL_CONTINUE) { + WriteSaslOutput(ctx, out, outlen); + } + if (curr_rc == SASL_OK) { + FinishAuth(ctx, bufQueue); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/sasl-util.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/connection/sasl-util.cc b/hbase-native-client/src/hbase/connection/sasl-util.cc new file mode 100644 index 0000000..7e7403e --- /dev/null +++ b/hbase-native-client/src/hbase/connection/sasl-util.cc @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "hbase/connection/sasl-util.h" + +#include <glog/logging.h> +#include <sasl/sasl.h> +#include <sasl/saslplug.h> +#include <sasl/saslutil.h> + +#include <string> + +int SaslUtil::GetPluginPath(void *context __attribute__((unused)), const char **path) { + *path = getenv("SASL_PATH"); + + if (*path == NULL) { + *path = kDefaultPluginDir; + } + return SASL_OK; +} + +void *SaslUtil::MutexNew(void) { + auto m = new std::mutex(); + return m; +} + +int SaslUtil::MutexLock(void *m) { + (reinterpret_cast<std::mutex *>(m))->lock(); + return SASL_OK; +} + +int SaslUtil::MutexUnlock(void *m) { + (reinterpret_cast<std::mutex *>(m))->unlock(); + return SASL_OK; +} + +void SaslUtil::MutexDispose(void *m) { + std::mutex *mutex = reinterpret_cast<std::mutex *>(m); + delete mutex; +} + +std::once_flag SaslUtil::library_inited_; + +void SaslUtil::InitializeSaslLib() { + std::call_once(library_inited_, []() { + sasl_set_mutex(reinterpret_cast<sasl_mutex_alloc_t *>(&SaslUtil::MutexNew), + reinterpret_cast<sasl_mutex_lock_t *>(&SaslUtil::MutexLock), + reinterpret_cast<sasl_mutex_unlock_t *>(&SaslUtil::MutexUnlock), + reinterpret_cast<sasl_mutex_free_t *>(&SaslUtil::MutexDispose)); + static sasl_callback_t callbacks[] = { + {SASL_CB_GETPATH, (sasl_callback_ft)&SaslUtil::GetPluginPath, NULL}, + {SASL_CB_LIST_END, NULL, NULL}}; + int rc = sasl_client_init(callbacks); + if (rc != SASL_OK) { + throw std::runtime_error("Cannot initialize client " + std::to_string(rc)); + } + }); +} + +std::string SaslUtil::ParseServiceName(std::shared_ptr<hbase::Configuration> conf, bool secure) { + if (!secure) { + return std::string(); + } + std::string svrPrincipal = conf->Get(kServerPrincipalConfKey, ""); + // principal is of this form: hbase/23a039358...@example.com + // where 23a03935850c is the host (optional) + std::size_t pos = svrPrincipal.find("/"); + if (pos == std::string::npos && svrPrincipal.find("@") != std::string::npos) { + pos = svrPrincipal.find("@"); + } + if (pos == std::string::npos) { + throw std::runtime_error("Couldn't retrieve service principal from conf"); + } + VLOG(1) << "pos " << pos << " " << svrPrincipal; + std::string service_name = svrPrincipal.substr(0, pos); + return service_name; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/exceptions/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/exceptions/BUCK b/hbase-native-client/src/hbase/exceptions/BUCK new file mode 100644 index 0000000..00ed344 --- /dev/null +++ b/hbase-native-client/src/hbase/exceptions/BUCK @@ -0,0 +1,37 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cxx_library( + name="exceptions", + srcs=[ + "exception.cc", + ], + deps=[ + "//include/hbase/exceptions:exceptions", + "//third-party:folly", + ], + compiler_flags=['-Weffc++'], + visibility=['//src/hbase/client/...', '//src/hbase/connection/...'],) +cxx_test( + name="exception-test", + srcs=[ + "exception-test.cc", + ], + deps=[ + ":exceptions", + ], + run_test_separately=True,) http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/exceptions/exception-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/exceptions/exception-test.cc b/hbase-native-client/src/hbase/exceptions/exception-test.cc new file mode 100644 index 0000000..e28f084 --- /dev/null +++ b/hbase-native-client/src/hbase/exceptions/exception-test.cc @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <gtest/gtest.h> + +#include "hbase/exceptions/exception.h" + +#include "folly/ExceptionWrapper.h" + +using hbase::ExceptionUtil; +using hbase::IOException; +using hbase::RemoteException; + +TEST(ExceptionUtilTest, IOExceptionShouldRetry) { + IOException ex{}; + EXPECT_TRUE(ExceptionUtil::ShouldRetry(ex)); + + ex.set_do_not_retry(true); + EXPECT_FALSE(ExceptionUtil::ShouldRetry(ex)); + + ex.set_do_not_retry(false); + EXPECT_TRUE(ExceptionUtil::ShouldRetry(ex)); + + IOException ex2{"description", true}; + EXPECT_FALSE(ExceptionUtil::ShouldRetry(ex2)); + + IOException ex3{"description", std::runtime_error("ex"), true}; + EXPECT_FALSE(ExceptionUtil::ShouldRetry(ex3)); +} + +TEST(ExceptionUtilTest, RemoteExceptionShouldRetry) { + RemoteException ex{}; + EXPECT_TRUE(ExceptionUtil::ShouldRetry(ex)); + + ex.set_do_not_retry(true); + EXPECT_FALSE(ExceptionUtil::ShouldRetry(ex)); + + ex.set_do_not_retry(false); + EXPECT_TRUE(ExceptionUtil::ShouldRetry(ex)); + + ex.set_exception_class_name("org.apache.hadoop.hbase.FooException"); + EXPECT_TRUE(ExceptionUtil::ShouldRetry(ex)); + + ex.set_exception_class_name("org.apache.hadoop.hbase.NotServingRegionException"); + EXPECT_TRUE(ExceptionUtil::ShouldRetry(ex)); + + ex.set_exception_class_name("org.apache.hadoop.hbase.UnknownRegionException"); + EXPECT_FALSE(ExceptionUtil::ShouldRetry(ex)); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/exceptions/exception.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/exceptions/exception.cc b/hbase-native-client/src/hbase/exceptions/exception.cc new file mode 100644 index 0000000..91ff818 --- /dev/null +++ b/hbase-native-client/src/hbase/exceptions/exception.cc @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "hbase/exceptions/exception.h" + +namespace hbase { +const std::vector<const char*> ExceptionUtil::kAllDoNotRetryIOExceptions = { + kDoNotRetryIOException, + kTableNotFoundException, + kTableNotEnabledException, + kCoprocessorException, + kBypassCoprocessorException, + kInvalidFamilyOperationException, + kServerTooBusyException, + kFailedSanityCheckException, + kCorruptHFileException, + kLabelAlreadyExistsException, + kFatalConnectionException, + kUnsupportedCryptoException, + kUnsupportedCellCodecException, + kEmptyServiceNameException, + kUnknownServiceException, + kWrongVersionException, + kBadAuthException, + kUnsupportedCompressionCodecException, + kDoNotRetryRegionException, + kRowTooBigException, + kRowTooBigExceptionDeprecated, + kUnknownRegionException, + kMergeRegionException, + kNoServerForRegionException, + kQuotaExceededException, + kSpaceLimitingException, + kThrottlingException, + kAccessDeniedException, + kUnknownProtocolException, + kRequestTooBigException, + kNotAllMetaRegionsOnlineException, + kConstraintException, + kNoSuchColumnFamilyException, + kLeaseException, + kInvalidLabelException, + kUnknownScannerException, + kScannerResetException, + kOutOfOrderScannerNextException}; + +bool ExceptionUtil::ShouldRetry(const folly::exception_wrapper& error) { + bool do_not_retry = false; + error.with_exception( + [&](const IOException& ioe) { do_not_retry = do_not_retry || ioe.do_not_retry(); }); + error.with_exception([&](const RemoteException& remote_ex) { + do_not_retry = do_not_retry || IsJavaDoNotRetryException(remote_ex.exception_class_name()); + }); + return !do_not_retry; +} + +/** + * Returns whether the java exception class extends DoNotRetryException. + * In the java side, we just have a hierarchy of Exception classes that we use + * both client side and server side. On the client side, we rethrow the server + * side exception by un-wrapping the exception from a RemoteException or a ServiceException + * (see ConnectionUtils.translateException() in Java). + * Since this object-hierarchy info is not available in C++ side, we are doing a + * very fragile catch-all list of all exception types in Java that extend the + * DoNotRetryException class type. + */ +bool ExceptionUtil::IsJavaDoNotRetryException(const std::string& java_class_name) { + for (auto exception : kAllDoNotRetryIOExceptions) { + if (java_class_name == exception) { + return true; + } + } + return false; +} + +/** + * Returns whether the scanner is closed when the client received the + * remote exception. + * Since the object-hierarchy info is not available in C++ side, we are doing a + * very fragile catch-all list of all exception types in Java that extend these + * three base classes: UnknownScannerException, NotServingRegionException, + * RegionServerStoppedException + */ +bool ExceptionUtil::IsScannerClosed(const folly::exception_wrapper& exception) { + bool scanner_closed = false; + exception.with_exception([&](const RemoteException& remote_ex) { + auto java_class = remote_ex.exception_class_name(); + if (java_class == kUnknownScannerException || java_class == kNotServingRegionException || + java_class == kRegionInRecoveryException || java_class == kRegionOpeningException || + java_class == kRegionMovedException || java_class == kRegionServerStoppedException || + java_class == kRegionServerAbortedException) { + scanner_closed = true; + } + }); + return scanner_closed; +} + +/** + * Returns whether the wrapped exception is a java exception of type OutOfOrderScannerNextException + * or ScannerResetException. These two exception types are thrown from the server side when the + * scanner on the server side is closed. + */ +bool ExceptionUtil::IsScannerOutOfOrder(const folly::exception_wrapper& exception) { + bool scanner_out_of_order = false; + exception.with_exception([&](const RemoteException& remote_ex) { + auto java_class = remote_ex.exception_class_name(); + if (java_class == kOutOfOrderScannerNextException || java_class == kScannerResetException) { + scanner_out_of_order = true; + } + }); + return scanner_out_of_order; +} +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/if/AccessControl.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/if/AccessControl.proto b/hbase-native-client/src/hbase/if/AccessControl.proto new file mode 100644 index 0000000..e67540b --- /dev/null +++ b/hbase-native-client/src/hbase/if/AccessControl.proto @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "AccessControlProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "HBase.proto"; + +message Permission { + enum Action { + READ = 0; + WRITE = 1; + EXEC = 2; + CREATE = 3; + ADMIN = 4; + } + enum Type { + Global = 1; + Namespace = 2; + Table = 3; + } + required Type type = 1; + optional GlobalPermission global_permission = 2; + optional NamespacePermission namespace_permission = 3; + optional TablePermission table_permission = 4; +} + +message TablePermission { + optional TableName table_name = 1; + optional bytes family = 2; + optional bytes qualifier = 3; + repeated Permission.Action action = 4; +} + +message NamespacePermission { + optional bytes namespace_name = 1; + repeated Permission.Action action = 2; +} + +message GlobalPermission { + repeated Permission.Action action = 1; +} + +message UserPermission { + required bytes user = 1; + required Permission permission = 3; +} + +/** + * Content of the /hbase/acl/<table or namespace> znode. + */ +message UsersAndPermissions { + message UserPermissions { + required bytes user = 1; + repeated Permission permissions = 2; + } + + repeated UserPermissions user_permissions = 1; +} + +message GrantRequest { + required UserPermission user_permission = 1; +} + +message GrantResponse { +} + +message RevokeRequest { + required UserPermission user_permission = 1; +} + +message RevokeResponse { +} + +message GetUserPermissionsRequest { + optional Permission.Type type = 1; + optional TableName table_name = 2; + optional bytes namespace_name = 3; +} + +message GetUserPermissionsResponse { + repeated UserPermission user_permission = 1; +} + +message CheckPermissionsRequest { + repeated Permission permission = 1; +} + +message CheckPermissionsResponse { +} + +service AccessControlService { + rpc Grant(GrantRequest) + returns (GrantResponse); + + rpc Revoke(RevokeRequest) + returns (RevokeResponse); + + rpc GetUserPermissions(GetUserPermissionsRequest) + returns (GetUserPermissionsResponse); + + rpc CheckPermissions(CheckPermissionsRequest) + returns (CheckPermissionsResponse); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/if/Admin.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/if/Admin.proto b/hbase-native-client/src/hbase/if/Admin.proto new file mode 100644 index 0000000..a1905a4 --- /dev/null +++ b/hbase-native-client/src/hbase/if/Admin.proto @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This file contains protocol buffers that are used for Admin service. +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "AdminProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "Client.proto"; +import "HBase.proto"; +import "WAL.proto"; + +message GetRegionInfoRequest { + required RegionSpecifier region = 1; + optional bool compaction_state = 2; +} + +message GetRegionInfoResponse { + required RegionInfo region_info = 1; + optional CompactionState compaction_state = 2; + optional bool isRecovering = 3; + + enum CompactionState { + NONE = 0; + MINOR = 1; + MAJOR = 2; + MAJOR_AND_MINOR = 3; + } +} + +/** + * Get a list of store files for a set of column families in a particular region. + * If no column family is specified, get the store files for all column families. + */ +message GetStoreFileRequest { + required RegionSpecifier region = 1; + repeated bytes family = 2; +} + +message GetStoreFileResponse { + repeated string store_file = 1; +} + +message GetOnlineRegionRequest { +} + +message GetOnlineRegionResponse { + repeated RegionInfo region_info = 1; +} + +message OpenRegionRequest { + repeated RegionOpenInfo open_info = 1; + // the intended server for this RPC. + optional uint64 serverStartCode = 2; + // wall clock time from master + optional uint64 master_system_time = 5; + + message RegionOpenInfo { + required RegionInfo region = 1; + optional uint32 version_of_offline_node = 2; + repeated ServerName favored_nodes = 3; + // open region for distributedLogReplay + optional bool openForDistributedLogReplay = 4; + } +} + +message OpenRegionResponse { + repeated RegionOpeningState opening_state = 1; + + enum RegionOpeningState { + OPENED = 0; + ALREADY_OPENED = 1; + FAILED_OPENING = 2; + } +} + +message WarmupRegionRequest { + + required RegionInfo regionInfo = 1; +} + +message WarmupRegionResponse { +} + +/** + * Closes the specified region and will use or not use ZK during the close + * according to the specified flag. + */ +message CloseRegionRequest { + required RegionSpecifier region = 1; + optional uint32 version_of_closing_node = 2; + optional bool transition_in_ZK = 3 [default = true]; + optional ServerName destination_server = 4; + // the intended server for this RPC. + optional uint64 serverStartCode = 5; +} + +message CloseRegionResponse { + required bool closed = 1; +} + +/** + * Flushes the MemStore of the specified region. + * <p> + * This method is synchronous. + */ +message FlushRegionRequest { + required RegionSpecifier region = 1; + optional uint64 if_older_than_ts = 2; + optional bool write_flush_wal_marker = 3; // whether to write a marker to WAL even if not flushed +} + +message FlushRegionResponse { + required uint64 last_flush_time = 1; + optional bool flushed = 2; + optional bool wrote_flush_wal_marker = 3; +} + +/** + * Splits the specified region. + * <p> + * This method currently flushes the region and then forces a compaction which + * will then trigger a split. The flush is done synchronously but the + * compaction is asynchronous. + */ +message SplitRegionRequest { + required RegionSpecifier region = 1; + optional bytes split_point = 2; +} + +message SplitRegionResponse { +} + +/** + * Compacts the specified region. Performs a major compaction if specified. + * <p> + * This method is asynchronous. + */ +message CompactRegionRequest { + required RegionSpecifier region = 1; + optional bool major = 2; + optional bytes family = 3; +} + +message CompactRegionResponse { +} + +message UpdateFavoredNodesRequest { + repeated RegionUpdateInfo update_info = 1; + + message RegionUpdateInfo { + required RegionInfo region = 1; + repeated ServerName favored_nodes = 2; + } +} + +message UpdateFavoredNodesResponse { + optional uint32 response = 1; +} + +/** + * Merges the specified regions. + * <p> + * This method currently closes the regions and then merges them + */ +message MergeRegionsRequest { + required RegionSpecifier region_a = 1; + required RegionSpecifier region_b = 2; + optional bool forcible = 3 [default = false]; + // wall clock time from master + optional uint64 master_system_time = 4; +} + +message MergeRegionsResponse { +} + +// Protocol buffer version of WAL for replication +message WALEntry { + required WALKey key = 1; + // Following may be null if the KVs/Cells are carried along the side in a cellblock (See + // RPC for more on cellblocks). If Cells/KVs are in a cellblock, this next field is null + // and associated_cell_count has count of Cells associated w/ this WALEntry + repeated bytes key_value_bytes = 2; + // If Cell data is carried alongside in a cellblock, this is count of Cells in the cellblock. + optional int32 associated_cell_count = 3; +} + +/** + * Replicates the given entries. The guarantee is that the given entries + * will be durable on the slave cluster if this method returns without + * any exception. hbase.replication has to be set to true for this to work. + */ +message ReplicateWALEntryRequest { + repeated WALEntry entry = 1; + optional string replicationClusterId = 2; + optional string sourceBaseNamespaceDirPath = 3; + optional string sourceHFileArchiveDirPath = 4; +} + +message ReplicateWALEntryResponse { +} + +message RollWALWriterRequest { +} + +/* + * Roll request responses no longer include regions to flush + * this list will always be empty when talking to a 1.0 server + */ +message RollWALWriterResponse { + // A list of encoded name of regions to flush + repeated bytes region_to_flush = 1; +} + +message StopServerRequest { + required string reason = 1; +} + +message StopServerResponse { +} + +message GetServerInfoRequest { +} + +message ServerInfo { + required ServerName server_name = 1; + optional uint32 webui_port = 2; +} + +message GetServerInfoResponse { + required ServerInfo server_info = 1; +} + +message UpdateConfigurationRequest { +} + +message UpdateConfigurationResponse { +} + +service AdminService { + rpc GetRegionInfo(GetRegionInfoRequest) + returns(GetRegionInfoResponse); + + rpc GetStoreFile(GetStoreFileRequest) + returns(GetStoreFileResponse); + + rpc GetOnlineRegion(GetOnlineRegionRequest) + returns(GetOnlineRegionResponse); + + rpc OpenRegion(OpenRegionRequest) + returns(OpenRegionResponse); + + rpc WarmupRegion(WarmupRegionRequest) + returns(WarmupRegionResponse); + + rpc CloseRegion(CloseRegionRequest) + returns(CloseRegionResponse); + + rpc FlushRegion(FlushRegionRequest) + returns(FlushRegionResponse); + + rpc SplitRegion(SplitRegionRequest) + returns(SplitRegionResponse); + + rpc CompactRegion(CompactRegionRequest) + returns(CompactRegionResponse); + + rpc MergeRegions(MergeRegionsRequest) + returns(MergeRegionsResponse); + + rpc ReplicateWALEntry(ReplicateWALEntryRequest) + returns(ReplicateWALEntryResponse); + + rpc Replay(ReplicateWALEntryRequest) + returns(ReplicateWALEntryResponse); + + rpc RollWALWriter(RollWALWriterRequest) + returns(RollWALWriterResponse); + + rpc GetServerInfo(GetServerInfoRequest) + returns(GetServerInfoResponse); + + rpc StopServer(StopServerRequest) + returns(StopServerResponse); + + rpc UpdateFavoredNodes(UpdateFavoredNodesRequest) + returns(UpdateFavoredNodesResponse); + + rpc UpdateConfiguration(UpdateConfigurationRequest) + returns(UpdateConfigurationResponse); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/if/Aggregate.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/if/Aggregate.proto b/hbase-native-client/src/hbase/if/Aggregate.proto new file mode 100644 index 0000000..4d32e70 --- /dev/null +++ b/hbase-native-client/src/hbase/if/Aggregate.proto @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "AggregateProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "Client.proto"; + +message AggregateRequest { + /** The request passed to the AggregateService consists of three parts + * (1) the (canonical) classname of the ColumnInterpreter implementation + * (2) the Scan query + * (3) any bytes required to construct the ColumnInterpreter object + * properly + */ + required string interpreter_class_name = 1; + required Scan scan = 2; + optional bytes interpreter_specific_bytes = 3; +} + +message AggregateResponse { + /** + * The AggregateService methods all have a response that either is a Pair + * or a simple object. When it is a Pair both first_part and second_part + * have defined values (and the second_part is not present in the response + * when the response is not a pair). Refer to the AggregateImplementation + * class for an overview of the AggregateResponse object constructions. + */ + repeated bytes first_part = 1; + optional bytes second_part = 2; +} + +/** Refer to the AggregateImplementation class for an overview of the + * AggregateService method implementations and their functionality. + */ +service AggregateService { + rpc GetMax (AggregateRequest) returns (AggregateResponse); + rpc GetMin (AggregateRequest) returns (AggregateResponse); + rpc GetSum (AggregateRequest) returns (AggregateResponse); + rpc GetRowNum (AggregateRequest) returns (AggregateResponse); + rpc GetAvg (AggregateRequest) returns (AggregateResponse); + rpc GetStd (AggregateRequest) returns (AggregateResponse); + rpc GetMedian (AggregateRequest) returns (AggregateResponse); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/if/Authentication.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/if/Authentication.proto b/hbase-native-client/src/hbase/if/Authentication.proto new file mode 100644 index 0000000..2f64799 --- /dev/null +++ b/hbase-native-client/src/hbase/if/Authentication.proto @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "AuthenticationProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +message AuthenticationKey { + required int32 id = 1; + required int64 expiration_date = 2; + required bytes key = 3; +} + + +message TokenIdentifier { + enum Kind { + HBASE_AUTH_TOKEN = 0; + } + required Kind kind = 1; + required bytes username = 2; + required int32 key_id = 3; + optional int64 issue_date = 4; + optional int64 expiration_date = 5; + optional int64 sequence_number = 6; +} + + +// Serialization of the org.apache.hadoop.security.token.Token class +// Note that this is a Hadoop class, so fields may change! +message Token { + // the TokenIdentifier in serialized form + // Note: we can't use the protobuf directly because the Hadoop Token class + // only stores the serialized bytes + optional bytes identifier = 1; + optional bytes password = 2; + optional bytes service = 3; +} + + +// RPC request & response messages +message GetAuthenticationTokenRequest { +} + +message GetAuthenticationTokenResponse { + optional Token token = 1; +} + +message WhoAmIRequest { +} + +message WhoAmIResponse { + optional string username = 1; + optional string auth_method = 2; +} + + +// RPC service +service AuthenticationService { + rpc GetAuthenticationToken(GetAuthenticationTokenRequest) + returns (GetAuthenticationTokenResponse); + + rpc WhoAmI(WhoAmIRequest) + returns (WhoAmIResponse); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/if/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/if/BUCK b/hbase-native-client/src/hbase/if/BUCK new file mode 100644 index 0000000..c8d51f2 --- /dev/null +++ b/hbase-native-client/src/hbase/if/BUCK @@ -0,0 +1,49 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +PROTO_SRCS = glob(['*.proto']) +HEADER_FILENAMES = [x.replace('.proto', '.pb.h') for x in PROTO_SRCS] +CC_FILENAMES = [x.replace('.proto', '.pb.cc') for x in PROTO_SRCS] + +genrule( + name='generate-proto-sources', + srcs=PROTO_SRCS, + cmd='mkdir -p $OUT && pwd && protoc --proto_path=. --cpp_out=$OUT *.proto', + out='output', ) + +for header_filename in HEADER_FILENAMES: + genrule(name=header_filename, + cmd='mkdir -p `dirname $OUT` ' + ' && cp $(location :generate-proto-sources)/{} $OUT'.format( + header_filename), + out=header_filename, ) +for cc_filename in CC_FILENAMES: + genrule( + name=cc_filename, + cmd='mkdir -p `dirname $OUT` ' + ' && cp $(location :generate-proto-sources)/*.cc `dirname $OUT` ' + ' && cp $(location :generate-proto-sources)/*.h `dirname $OUT`'.format( + cc_filename), + out=cc_filename, ) + +cxx_library(name='if', + header_namespace="hbase/if", + exported_headers=[':' + x for x in HEADER_FILENAMES], + srcs=[':' + x for x in CC_FILENAMES], + deps=['//third-party:protobuf'], + visibility=['PUBLIC', ], + exported_deps=['//third-party:protobuf']) http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/if/Cell.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/if/Cell.proto b/hbase-native-client/src/hbase/if/Cell.proto new file mode 100644 index 0000000..2c61035 --- /dev/null +++ b/hbase-native-client/src/hbase/if/Cell.proto @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Cell and KeyValue protos +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "CellProtos"; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +/** + * The type of the key in a Cell + */ +enum CellType { + MINIMUM = 0; + PUT = 4; + + DELETE = 8; + DELETE_COLUMN = 12; + DELETE_FAMILY = 14; + + // MAXIMUM is used when searching; you look from maximum on down. + MAXIMUM = 255; +} + +/** + * Protocol buffer version of Cell. + */ +message Cell { + optional bytes row = 1; + optional bytes family = 2; + optional bytes qualifier = 3; + optional uint64 timestamp = 4; + optional CellType cell_type = 5; + optional bytes value = 6; + optional bytes tags = 7; +} + +/** + * Protocol buffer version of KeyValue. + * It doesn't have those transient parameters + */ +message KeyValue { + required bytes row = 1; + required bytes family = 2; + required bytes qualifier = 3; + optional uint64 timestamp = 4; + optional CellType key_type = 5; + optional bytes value = 6; + optional bytes tags = 7; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/if/Client.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/if/Client.proto b/hbase-native-client/src/hbase/if/Client.proto new file mode 100644 index 0000000..8a4d459 --- /dev/null +++ b/hbase-native-client/src/hbase/if/Client.proto @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This file contains protocol buffers that are used for Client service. +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "ClientProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "HBase.proto"; +import "Filter.proto"; +import "Cell.proto"; +import "Comparator.proto"; +import "MapReduce.proto"; + +/** + * The protocol buffer version of Authorizations. + */ +message Authorizations { + repeated string label = 1; +} + +/** + * The protocol buffer version of CellVisibility. + */ +message CellVisibility { + required string expression = 1; +} + +/** + * Container for a list of column qualifier names of a family. + */ +message Column { + required bytes family = 1; + repeated bytes qualifier = 2; +} + +/** + * Consistency defines the expected consistency level for an operation. + */ +enum Consistency { + STRONG = 0; + TIMELINE = 1; +} + +/** + * The protocol buffer version of Get. + * Unless existence_only is specified, return all the requested data + * for the row that matches exactly. + */ +message Get { + required bytes row = 1; + repeated Column column = 2; + repeated NameBytesPair attribute = 3; + optional Filter filter = 4; + optional TimeRange time_range = 5; + optional uint32 max_versions = 6 [default = 1]; + optional bool cache_blocks = 7 [default = true]; + optional uint32 store_limit = 8; + optional uint32 store_offset = 9; + + // The result isn't asked for, just check for + // the existence. + optional bool existence_only = 10 [default = false]; + + optional Consistency consistency = 12 [default = STRONG]; + repeated ColumnFamilyTimeRange cf_time_range = 13; +} + +message Result { + // Result includes the Cells or else it just has a count of Cells + // that are carried otherwise. + repeated Cell cell = 1; + // The below count is set when the associated cells are + // not part of this protobuf message; they are passed alongside + // and then this Message is just a placeholder with metadata. + // The count is needed to know how many to peel off the block of Cells as + // ours. NOTE: This is different from the pb managed cell_count of the + // 'cell' field above which is non-null when the cells are pb'd. + optional int32 associated_cell_count = 2; + + // used for Get to check existence only. Not set if existence_only was not set to true + // in the query. + optional bool exists = 3; + + // Whether or not the results are coming from possibly stale data + optional bool stale = 4 [default = false]; + + // Whether or not the entire result could be returned. Results will be split when + // the RPC chunk size limit is reached. Partial results contain only a subset of the + // cells for a row and must be combined with a result containing the remaining cells + // to form a complete result + optional bool partial = 5 [default = false]; +} + +/** + * The get request. Perform a single Get operation. + */ +message GetRequest { + required RegionSpecifier region = 1; + required Get get = 2; +} + +message GetResponse { + optional Result result = 1; +} + +/** + * Condition to check if the value of a given cell (row, + * family, qualifier) matches a value via a given comparator. + * + * Condition is used in check and mutate operations. + */ +message Condition { + required bytes row = 1; + required bytes family = 2; + required bytes qualifier = 3; + required CompareType compare_type = 4; + required Comparator comparator = 5; +} + + +/** + * A specific mutation inside a mutate request. + * It can be an append, increment, put or delete based + * on the mutation type. It can be fully filled in or + * only metadata present because data is being carried + * elsewhere outside of pb. + */ +message MutationProto { + optional bytes row = 1; + optional MutationType mutate_type = 2; + repeated ColumnValue column_value = 3; + optional uint64 timestamp = 4; + repeated NameBytesPair attribute = 5; + optional Durability durability = 6 [default = USE_DEFAULT]; + + // For some mutations, a result may be returned, in which case, + // time range can be specified for potential performance gain + optional TimeRange time_range = 7; + // The below count is set when the associated cells are NOT + // part of this protobuf message; they are passed alongside + // and then this Message is a placeholder with metadata. The + // count is needed to know how many to peel off the block of Cells as + // ours. NOTE: This is different from the pb managed cell_count of the + // 'cell' field above which is non-null when the cells are pb'd. + optional int32 associated_cell_count = 8; + + optional uint64 nonce = 9; + + enum Durability { + USE_DEFAULT = 0; + SKIP_WAL = 1; + ASYNC_WAL = 2; + SYNC_WAL = 3; + FSYNC_WAL = 4; + } + + enum MutationType { + APPEND = 0; + INCREMENT = 1; + PUT = 2; + DELETE = 3; + } + + enum DeleteType { + DELETE_ONE_VERSION = 0; + DELETE_MULTIPLE_VERSIONS = 1; + DELETE_FAMILY = 2; + DELETE_FAMILY_VERSION = 3; + } + + message ColumnValue { + required bytes family = 1; + repeated QualifierValue qualifier_value = 2; + + message QualifierValue { + optional bytes qualifier = 1; + optional bytes value = 2; + optional uint64 timestamp = 3; + optional DeleteType delete_type = 4; + optional bytes tags = 5; + } + } +} + +/** + * The mutate request. Perform a single Mutate operation. + * + * Optionally, you can specify a condition. The mutate + * will take place only if the condition is met. Otherwise, + * the mutate will be ignored. In the response result, + * parameter processed is used to indicate if the mutate + * actually happened. + */ +message MutateRequest { + required RegionSpecifier region = 1; + required MutationProto mutation = 2; + optional Condition condition = 3; + optional uint64 nonce_group = 4; +} + +message MutateResponse { + optional Result result = 1; + + // used for mutate to indicate processed only + optional bool processed = 2; +} + +/** + * Instead of get from a table, you can scan it with optional filters. + * You can specify the row key range, time range, the columns/families + * to scan and so on. + * + * This scan is used the first time in a scan request. The response of + * the initial scan will return a scanner id, which should be used to + * fetch result batches later on before it is closed. + */ +message Scan { + repeated Column column = 1; + repeated NameBytesPair attribute = 2; + optional bytes start_row = 3; + optional bytes stop_row = 4; + optional Filter filter = 5; + optional TimeRange time_range = 6; + optional uint32 max_versions = 7 [default = 1]; + optional bool cache_blocks = 8 [default = true]; + optional uint32 batch_size = 9; + optional uint64 max_result_size = 10; + optional uint32 store_limit = 11; + optional uint32 store_offset = 12; + optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */ + optional bool small = 14; + optional bool reversed = 15 [default = false]; + optional Consistency consistency = 16 [default = STRONG]; + optional uint32 caching = 17; + optional bool allow_partial_results = 18; + repeated ColumnFamilyTimeRange cf_time_range = 19; +} + +/** + * A scan request. Initially, it should specify a scan. Later on, you + * can use the scanner id returned to fetch result batches with a different + * scan request. + * + * The scanner will remain open if there are more results, and it's not + * asked to be closed explicitly. + * + * You can fetch the results and ask the scanner to be closed to save + * a trip if you are not interested in remaining results. + */ +message ScanRequest { + optional RegionSpecifier region = 1; + optional Scan scan = 2; + optional uint64 scanner_id = 3; + optional uint32 number_of_rows = 4; + optional bool close_scanner = 5; + optional uint64 next_call_seq = 6; + optional bool client_handles_partials = 7; + optional bool client_handles_heartbeats = 8; + optional bool track_scan_metrics = 9; + optional bool renew = 10 [default = false]; +} + +/** + * The scan response. If there are no more results, more_results will + * be false. If it is not specified, it means there are more. + */ +message ScanResponse { + // This field is filled in if we are doing cellblocks. A cellblock is made up + // of all Cells serialized out as one cellblock BUT responses from a server + // have their Cells grouped by Result. So we can reconstitute the + // Results on the client-side, this field is a list of counts of Cells + // in each Result that makes up the response. For example, if this field + // has 3, 3, 3 in it, then we know that on the client, we are to make + // three Results each of three Cells each. + repeated uint32 cells_per_result = 1; + + optional uint64 scanner_id = 2; + optional bool more_results = 3; + optional uint32 ttl = 4; + // If cells are not carried in an accompanying cellblock, then they are pb'd here. + // This field is mutually exclusive with cells_per_result (since the Cells will + // be inside the pb'd Result) + repeated Result results = 5; + optional bool stale = 6; + + // This field is filled in if we are doing cellblocks. In the event that a row + // could not fit all of its cells into a single RPC chunk, the results will be + // returned as partials, and reconstructed into a complete result on the client + // side. This field is a list of flags indicating whether or not the result + // that the cells belong to is a partial result. For example, if this field + // has false, false, true in it, then we know that on the client side, we need to + // make another RPC request since the last result was only a partial. + repeated bool partial_flag_per_result = 7; + + // A server may choose to limit the number of results returned to the client for + // reasons such as the size in bytes or quantity of results accumulated. This field + // will true when more results exist in the current region. + optional bool more_results_in_region = 8; + + // This field is filled in if the server is sending back a heartbeat message. + // Heartbeat messages are sent back to the client to prevent the scanner from + // timing out. Seeing a heartbeat message communicates to the Client that the + // server would have continued to scan had the time limit not been reached. + optional bool heartbeat_message = 9; + + // This field is filled in if the client has requested that scan metrics be tracked. + // The metrics tracked here are sent back to the client to be tracked together with + // the existing client side metrics. + optional ScanMetrics scan_metrics = 10; +} + +/** + * Atomically bulk load multiple HFiles (say from different column families) + * into an open region. + */ +message BulkLoadHFileRequest { + required RegionSpecifier region = 1; + repeated FamilyPath family_path = 2; + optional bool assign_seq_num = 3; + + message FamilyPath { + required bytes family = 1; + required string path = 2; + } +} + +message BulkLoadHFileResponse { + required bool loaded = 1; +} + +message CoprocessorServiceCall { + required bytes row = 1; + required string service_name = 2; + required string method_name = 3; + required bytes request = 4; +} + +message CoprocessorServiceResult { + optional NameBytesPair value = 1; +} + +message CoprocessorServiceRequest { + required RegionSpecifier region = 1; + required CoprocessorServiceCall call = 2; +} + +message CoprocessorServiceResponse { + required RegionSpecifier region = 1; + required NameBytesPair value = 2; +} + +// Either a Get or a Mutation +message Action { + // If part of a multi action, useful aligning + // result with what was originally submitted. + optional uint32 index = 1; + optional MutationProto mutation = 2; + optional Get get = 3; + optional CoprocessorServiceCall service_call = 4; +} + +/** + * Actions to run against a Region. + */ +message RegionAction { + required RegionSpecifier region = 1; + // When set, run mutations as atomic unit. + optional bool atomic = 2; + repeated Action action = 3; +} + +/* +* Statistics about the current load on the region +*/ +message RegionLoadStats { + // Percent load on the memstore. Guaranteed to be positive, between 0 and 100. + optional int32 memstoreLoad = 1 [default = 0]; + // Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. + // We can move this to "ServerLoadStats" should we develop them. + optional int32 heapOccupancy = 2 [default = 0]; + // Compaction pressure. Guaranteed to be positive, between 0 and 100. + optional int32 compactionPressure = 3 [default = 0]; +} + +message MultiRegionLoadStats{ + repeated RegionSpecifier region = 1; + repeated RegionLoadStats stat = 2; +} + +/** + * Either a Result or an Exception NameBytesPair (keyed by + * exception name whose value is the exception stringified) + * or maybe empty if no result and no exception. + */ +message ResultOrException { + // If part of a multi call, save original index of the list of all + // passed so can align this response w/ original request. + optional uint32 index = 1; + optional Result result = 2; + optional NameBytesPair exception = 3; + // result if this was a coprocessor service call + optional CoprocessorServiceResult service_result = 4; + // current load on the region + optional RegionLoadStats loadStats = 5 [deprecated=true]; +} + +/** + * The result of a RegionAction. + */ +message RegionActionResult { + repeated ResultOrException resultOrException = 1; + // If the operation failed globally for this region, this exception is set + optional NameBytesPair exception = 2; +} + +/** + * Execute a list of actions on a given region in order. + * Nothing prevents a request to contains a set of RegionAction on the same region. + * For this reason, the matching between the MultiRequest and the MultiResponse is not + * done by the region specifier but by keeping the order of the RegionActionResult vs. + * the order of the RegionAction. + */ +message MultiRequest { + repeated RegionAction regionAction = 1; + optional uint64 nonceGroup = 2; + optional Condition condition = 3; +} + +message MultiResponse { + repeated RegionActionResult regionActionResult = 1; + // used for mutate to indicate processed only + optional bool processed = 2; + optional MultiRegionLoadStats regionStatistics = 3; +} + + +service ClientService { + rpc Get(GetRequest) + returns(GetResponse); + + rpc Mutate(MutateRequest) + returns(MutateResponse); + + rpc Scan(ScanRequest) + returns(ScanResponse); + + rpc BulkLoadHFile(BulkLoadHFileRequest) + returns(BulkLoadHFileResponse); + + rpc ExecService(CoprocessorServiceRequest) + returns(CoprocessorServiceResponse); + + rpc ExecRegionServerService(CoprocessorServiceRequest) + returns(CoprocessorServiceResponse); + + rpc Multi(MultiRequest) + returns(MultiResponse); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/if/ClusterId.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/if/ClusterId.proto b/hbase-native-client/src/hbase/if/ClusterId.proto new file mode 100644 index 0000000..aed8cfc --- /dev/null +++ b/hbase-native-client/src/hbase/if/ClusterId.proto @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This file contains protocol buffers that are shared throughout HBase +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "ClusterIdProtos"; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +/** + * Content of the '/hbase/hbaseid', cluster id, znode. + * Also cluster of the ${HBASE_ROOTDIR}/hbase.id file. + */ +message ClusterId { + // This is the cluster id, a uuid as a String + required string cluster_id = 1; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/if/ClusterStatus.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/if/ClusterStatus.proto b/hbase-native-client/src/hbase/if/ClusterStatus.proto new file mode 100644 index 0000000..54bc0c3 --- /dev/null +++ b/hbase-native-client/src/hbase/if/ClusterStatus.proto @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This file contains protocol buffers that are used for ClustStatus +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "ClusterStatusProtos"; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "HBase.proto"; +import "ClusterId.proto"; +import "FS.proto"; + +message RegionState { + required RegionInfo region_info = 1; + required State state = 2; + optional uint64 stamp = 3; + enum State { + OFFLINE = 0; // region is in an offline state + PENDING_OPEN = 1; // sent rpc to server to open but has not begun + OPENING = 2; // server has begun to open but not yet done + OPEN = 3; // server opened region and updated meta + PENDING_CLOSE = 4; // sent rpc to server to close but has not begun + CLOSING = 5; // server has begun to close but not yet done + CLOSED = 6; // server closed region and updated meta + SPLITTING = 7; // server started split of a region + SPLIT = 8; // server completed split of a region + FAILED_OPEN = 9; // failed to open, and won't retry any more + FAILED_CLOSE = 10; // failed to close, and won't retry any more + MERGING = 11; // server started merge a region + MERGED = 12; // server completed merge of a region + SPLITTING_NEW = 13; // new region to be created when RS splits a parent + // region but hasn't be created yet, or master doesn't + // know it's already created + MERGING_NEW = 14; // new region to be created when RS merges two + // daughter regions but hasn't be created yet, or + // master doesn't know it's already created + } +} + +message RegionInTransition { + required RegionSpecifier spec = 1; + required RegionState region_state = 2; +} + +/** + * sequence Id of a store + */ +message StoreSequenceId { + required bytes family_name = 1; + required uint64 sequence_id = 2; +} + +/** + * contains a sequence id of a region which should be the minimum of its store sequence ids and + * list of sequence ids of the region's stores + */ +message RegionStoreSequenceIds { + required uint64 last_flushed_sequence_id = 1; + repeated StoreSequenceId store_sequence_id = 2; +} + +message RegionLoad { + /** the region specifier */ + required RegionSpecifier region_specifier = 1; + + /** the number of stores for the region */ + optional uint32 stores = 2; + + /** the number of storefiles for the region */ + optional uint32 storefiles = 3; + + /** the total size of the store files for the region, uncompressed, in MB */ + optional uint32 store_uncompressed_size_MB = 4; + + /** the current total size of the store files for the region, in MB */ + optional uint32 storefile_size_MB = 5; + + /** the current size of the memstore for the region, in MB */ + optional uint32 memstore_size_MB = 6; + + /** + * The current total size of root-level store file indexes for the region, + * in MB. The same as {@link #rootIndexSizeKB} but in MB. + */ + optional uint32 storefile_index_size_MB = 7; + + /** the current total read requests made to region */ + optional uint64 read_requests_count = 8; + + /** the current total write requests made to region */ + optional uint64 write_requests_count = 9; + + /** the total compacting key values in currently running compaction */ + optional uint64 total_compacting_KVs = 10; + + /** the completed count of key values in currently running compaction */ + optional uint64 current_compacted_KVs = 11; + + /** The current total size of root-level indexes for the region, in KB. */ + optional uint32 root_index_size_KB = 12; + + /** The total size of all index blocks, not just the root level, in KB. */ + optional uint32 total_static_index_size_KB = 13; + + /** + * The total size of all Bloom filter blocks, not just loaded into the + * block cache, in KB. + */ + optional uint32 total_static_bloom_size_KB = 14; + + /** the most recent sequence Id from cache flush */ + optional uint64 complete_sequence_id = 15; + + /** The current data locality for region in the regionserver */ + optional float data_locality = 16; + + optional uint64 last_major_compaction_ts = 17 [default = 0]; + + /** the most recent sequence Id of store from cache flush */ + repeated StoreSequenceId store_complete_sequence_id = 18; + + /** the current total filtered read requests made to region */ + optional uint64 filtered_read_requests_count = 19; +} + +/* Server-level protobufs */ + +message ReplicationLoadSink { + required uint64 ageOfLastAppliedOp = 1; + required uint64 timeStampsOfLastAppliedOp = 2; +} + +message ReplicationLoadSource { + required string peerID = 1; + required uint64 ageOfLastShippedOp = 2; + required uint32 sizeOfLogQueue = 3; + required uint64 timeStampOfLastShippedOp = 4; + required uint64 replicationLag = 5; +} + +message ServerLoad { + /** Number of requests since last report. */ + optional uint64 number_of_requests = 1; + + /** Total Number of requests from the start of the region server. */ + optional uint64 total_number_of_requests = 2; + + /** the amount of used heap, in MB. */ + optional uint32 used_heap_MB = 3; + + /** the maximum allowable size of the heap, in MB. */ + optional uint32 max_heap_MB = 4; + + /** Information on the load of individual regions. */ + repeated RegionLoad region_loads = 5; + + /** + * Regionserver-level coprocessors, e.g., WALObserver implementations. + * Region-level coprocessors, on the other hand, are stored inside RegionLoad + * objects. + */ + repeated Coprocessor coprocessors = 6; + + /** + * Time when incremental (non-total) counts began being calculated (e.g. number_of_requests) + * time is measured as the difference, measured in milliseconds, between the current time + * and midnight, January 1, 1970 UTC. + */ + optional uint64 report_start_time = 7; + + /** + * Time when report was generated. + * time is measured as the difference, measured in milliseconds, between the current time + * and midnight, January 1, 1970 UTC. + */ + optional uint64 report_end_time = 8; + + /** + * The port number that this region server is hosing an info server on. + */ + optional uint32 info_server_port = 9; + + /** + * The replicationLoadSource for the replication Source status of this region server. + */ + repeated ReplicationLoadSource replLoadSource = 10; + + /** + * The replicationLoadSink for the replication Sink status of this region server. + */ + optional ReplicationLoadSink replLoadSink = 11; +} + +message LiveServerInfo { + required ServerName server = 1; + required ServerLoad server_load = 2; +} + +message ClusterStatus { + optional HBaseVersionFileContent hbase_version = 1; + repeated LiveServerInfo live_servers = 2; + repeated ServerName dead_servers = 3; + repeated RegionInTransition regions_in_transition = 4; + optional ClusterId cluster_id = 5; + repeated Coprocessor master_coprocessors = 6; + optional ServerName master = 7; + repeated ServerName backup_masters = 8; + optional bool balancer_on = 9; +}