http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/request.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/request.cc b/hbase-native-client/connection/request.cc deleted file mode 100644 index 8983726..0000000 --- a/hbase-native-client/connection/request.cc +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "connection/request.h" - -#include "if/Client.pb.h" - -namespace hbase { - -Request::Request(std::shared_ptr<google::protobuf::Message> req, - std::shared_ptr<google::protobuf::Message> resp, std::string method) - : req_msg_(req), resp_msg_(resp), method_(method), call_id_(0) {} - -std::unique_ptr<Request> Request::get() { - return std::make_unique<Request>(std::make_shared<hbase::pb::GetRequest>(), - std::make_shared<hbase::pb::GetResponse>(), "Get"); -} -std::unique_ptr<Request> Request::mutate() { - return std::make_unique<Request>(std::make_shared<hbase::pb::MutateRequest>(), - std::make_shared<hbase::pb::MutateResponse>(), "Mutate"); -} -std::unique_ptr<Request> Request::scan() { - return std::make_unique<Request>(std::make_shared<hbase::pb::ScanRequest>(), - std::make_shared<hbase::pb::ScanResponse>(), "Scan"); -} -std::unique_ptr<Request> Request::multi() { - return std::make_unique<Request>(std::make_shared<hbase::pb::MultiRequest>(), - std::make_shared<hbase::pb::MultiResponse>(), "Multi"); -} -} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h deleted file mode 100644 index 4b652c0..0000000 --- a/hbase-native-client/connection/request.h +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/Conv.h> -#include <google/protobuf/message.h> - -#include <cstdint> -#include <memory> -#include <string> - -namespace hbase { - -/** - * Main request class. - * This holds the request object and the un-filled in approriatley typed - * response object. - */ -class Request { - public: - /** Create a request object for a get */ - static std::unique_ptr<Request> get(); - /** Create a request object for a mutate */ - static std::unique_ptr<Request> mutate(); - /** Create a request object for a scan */ - static std::unique_ptr<Request> scan(); - /** Create a request object for a multi */ - static std::unique_ptr<Request> multi(); - - /** - * This should be private. Do not use this. - * - * - * Constructor that's public for make_unique. This sets all the messages and - * method name. - */ - Request(std::shared_ptr<google::protobuf::Message> req, - std::shared_ptr<google::protobuf::Message> resp, std::string method); - - /** Get the call id. */ - uint32_t call_id() { return call_id_; } - /** Set the call id. This should only be set once. */ - void set_call_id(uint32_t call_id) { call_id_ = call_id; } - /** Get the backing request protobuf message. */ - std::shared_ptr<google::protobuf::Message> req_msg() { return req_msg_; } - /** Get the backing response protobuf message. */ - std::shared_ptr<google::protobuf::Message> resp_msg() { return resp_msg_; } - /** Get the method name. This is used to the the receiving rpc server what - * method type to decode. */ - std::string method() { return method_; } - - std::string DebugString() { - return "call_id:" + folly::to<std::string>(call_id_) + ", req_msg:" + - req_msg_->ShortDebugString() + ", method:" + method_; - } - - private: - uint32_t call_id_; - std::shared_ptr<google::protobuf::Message> req_msg_ = nullptr; - std::shared_ptr<google::protobuf::Message> resp_msg_ = nullptr; - std::string method_ = "Get"; -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/response.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/response.h b/hbase-native-client/connection/response.h deleted file mode 100644 index 38fdda0..0000000 --- a/hbase-native-client/connection/response.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/Conv.h> -#include <folly/ExceptionWrapper.h> - -#include <cstdint> -#include <memory> -#include <string> -#include <utility> - -#include "serde/cell-scanner.h" - -// Forward -namespace google { -namespace protobuf { -class Message; -} -} - -namespace hbase { - -/** - * @brief Class representing a rpc response - * - * This is the class sent to a service. - */ -class Response { - public: - /** - * Constructor. - * Initinalizes the call id to 0. 0 should never be a valid call id. - */ - Response() : call_id_(0), resp_msg_(nullptr), cell_scanner_(nullptr), exception_(nullptr) {} - - /** Get the call_id */ - uint32_t call_id() { return call_id_; } - - /** Set the call_id */ - void set_call_id(uint32_t call_id) { call_id_ = call_id; } - - /** - * Get the response message. - * The caller is reponsible for knowing the type. In practice the call id is - * used to figure out the type. - */ - std::shared_ptr<google::protobuf::Message> resp_msg() const { return resp_msg_; } - - /** Set the response message. */ - void set_resp_msg(std::shared_ptr<google::protobuf::Message> response) { - resp_msg_ = std::move(response); - } - - void set_cell_scanner(std::shared_ptr<CellScanner> cell_scanner) { cell_scanner_ = cell_scanner; } - - const std::shared_ptr<CellScanner> cell_scanner() const { return cell_scanner_; } - - folly::exception_wrapper exception() { return exception_; } - - void set_exception(folly::exception_wrapper value) { exception_ = value; } - - std::string DebugString() const { - std::string s{"call_id:"}; - s += folly::to<std::string>(call_id_); - s += ", resp_msg:"; - s += resp_msg_->ShortDebugString(); - return s; - } - - private: - uint32_t call_id_; - std::shared_ptr<google::protobuf::Message> resp_msg_; - std::shared_ptr<CellScanner> cell_scanner_; - folly::exception_wrapper exception_; -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc deleted file mode 100644 index 51c9c63..0000000 --- a/hbase-native-client/connection/rpc-client.cc +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "connection/rpc-client.h" - -#include <folly/Format.h> -#include <folly/Logging.h> -#include <folly/futures/Future.h> -#include <unistd.h> -#include <wangle/concurrent/IOThreadPoolExecutor.h> -#include "exceptions/exception.h" - -using hbase::security::User; -using std::chrono::nanoseconds; - -namespace hbase { - -RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor, - std::shared_ptr<Codec> codec, std::shared_ptr<Configuration> conf, - nanoseconds connect_timeout) - : io_executor_(io_executor), conf_(conf) { - cp_ = std::make_shared<ConnectionPool>(io_executor_, cpu_executor, codec, conf, connect_timeout); -} - -void RpcClient::Close() { io_executor_->stop(); } - -std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port, - std::unique_ptr<Request> req, - std::shared_ptr<User> ticket) { - return AsyncCall(host, port, std::move(req), ticket).get(); -} - -std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port, - std::unique_ptr<Request> req, - std::shared_ptr<User> ticket, - const std::string& service_name) { - return AsyncCall(host, port, std::move(req), ticket, service_name).get(); -} - -folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host, - uint16_t port, - std::unique_ptr<Request> req, - std::shared_ptr<User> ticket) { - auto remote_id = std::make_shared<ConnectionId>(host, port, ticket); - return SendRequest(remote_id, std::move(req)); -} - -folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host, - uint16_t port, - std::unique_ptr<Request> req, - std::shared_ptr<User> ticket, - const std::string& service_name) { - auto remote_id = std::make_shared<ConnectionId>(host, port, ticket, service_name); - return SendRequest(remote_id, std::move(req)); -} - -/** - * There are two cases for ConnectionException: - * 1. The first time connection - * establishment, i.e. GetConnection(remote_id), AsyncSocketException being a cause. - * 2. Writing request down the pipeline, i.e. RpcConnection::SendRequest, AsyncSocketException being - * a cause as well. - */ -folly::Future<std::unique_ptr<Response>> RpcClient::SendRequest( - std::shared_ptr<ConnectionId> remote_id, std::unique_ptr<Request> req) { - try { - return GetConnection(remote_id) - ->SendRequest(std::move(req)) - .onError([&, this](const folly::exception_wrapper& ew) { - VLOG(3) << folly::sformat("RpcClient Exception: {}", ew.what()); - ew.with_exception([&, this](const hbase::ConnectionException& re) { - /* bad connection, remove it from pool. */ - cp_->Close(remote_id); - }); - return GetFutureWithException(ew); - }); - } catch (const ConnectionException& e) { - CHECK(e.cause().get_exception() != nullptr); - VLOG(3) << folly::sformat("RpcClient Exception: {}", e.cause().what()); - /* bad connection, remove it from pool. */ - cp_->Close(remote_id); - return GetFutureWithException(e); - } -} - -template <typename EXCEPTION> -folly::Future<std::unique_ptr<Response>> RpcClient::GetFutureWithException(const EXCEPTION& e) { - return GetFutureWithException(folly::exception_wrapper{e}); -} - -folly::Future<std::unique_ptr<Response>> RpcClient::GetFutureWithException( - const folly::exception_wrapper& ew) { - folly::Promise<std::unique_ptr<Response>> promise; - auto future = promise.getFuture(); - promise.setException(ew); - return future; -} - -std::shared_ptr<RpcConnection> RpcClient::GetConnection(std::shared_ptr<ConnectionId> remote_id) { - return cp_->GetConnection(remote_id); -} -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-client.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h deleted file mode 100644 index 93801d8..0000000 --- a/hbase-native-client/connection/rpc-client.h +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <google/protobuf/service.h> - -#include <folly/ExceptionWrapper.h> -#include <chrono> -#include <memory> -#include <string> -#include <utility> - -#include "connection/connection-id.h" -#include "connection/connection-pool.h" -#include "connection/request.h" -#include "connection/response.h" -#include "security/user.h" - -namespace hbase { - -class RpcClient { - public: - RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor, - std::shared_ptr<Codec> codec, std::shared_ptr<Configuration> conf, - std::chrono::nanoseconds connect_timeout = std::chrono::nanoseconds(0)); - - virtual ~RpcClient() { Close(); } - - virtual std::unique_ptr<Response> SyncCall(const std::string &host, uint16_t port, - std::unique_ptr<Request> req, - std::shared_ptr<security::User> ticket); - - virtual std::unique_ptr<Response> SyncCall(const std::string &host, uint16_t port, - std::unique_ptr<Request> req, - std::shared_ptr<security::User> ticket, - const std::string &service_name); - - virtual folly::Future<std::unique_ptr<Response>> AsyncCall( - const std::string &host, uint16_t port, std::unique_ptr<Request> req, - std::shared_ptr<security::User> ticket); - - virtual folly::Future<std::unique_ptr<Response>> AsyncCall(const std::string &host, uint16_t port, - std::unique_ptr<Request> req, - std::shared_ptr<security::User> ticket, - const std::string &service_name); - - virtual void Close(); - - std::shared_ptr<ConnectionPool> connection_pool() const { return cp_; } - - private: - std::shared_ptr<RpcConnection> GetConnection(std::shared_ptr<ConnectionId> remote_id); - folly::Future<std::unique_ptr<Response>> SendRequest(std::shared_ptr<ConnectionId> remote_id, - std::unique_ptr<Request> req); - template <typename EXCEPTION> - folly::Future<std::unique_ptr<Response>> GetFutureWithException(const EXCEPTION &e); - - folly::Future<std::unique_ptr<Response>> GetFutureWithException( - const folly::exception_wrapper &ew); - - private: - std::shared_ptr<ConnectionPool> cp_; - std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; - std::shared_ptr<Configuration> conf_; -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-connection.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-connection.h b/hbase-native-client/connection/rpc-connection.h deleted file mode 100644 index 9063280..0000000 --- a/hbase-native-client/connection/rpc-connection.h +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <memory> -#include <mutex> -#include <utility> - -#include "connection/connection-factory.h" -#include "connection/connection-id.h" -#include "connection/request.h" -#include "connection/response.h" -#include "connection/service.h" - -namespace hbase { - -class RpcConnection : public std::enable_shared_from_this<RpcConnection> { - public: - RpcConnection(std::shared_ptr<ConnectionId> connection_id, std::shared_ptr<ConnectionFactory> cf) - : connection_id_(connection_id), cf_(cf), hbase_service_(nullptr) {} - - virtual ~RpcConnection() { Close(); } - - virtual std::shared_ptr<ConnectionId> remote_id() const { return connection_id_; } - - virtual folly::Future<std::unique_ptr<Response>> SendRequest(std::unique_ptr<Request> req) { - std::lock_guard<std::recursive_mutex> lock(mutex_); - if (hbase_service_ == nullptr) { - Connect(); - } - VLOG(5) << "Calling RpcConnection::SendRequest()"; // TODO - return (*hbase_service_)(std::move(req)); - } - - virtual void Close() { - std::lock_guard<std::recursive_mutex> lock(mutex_); - if (hbase_service_) { - hbase_service_->close(); - hbase_service_ = nullptr; - } - if (client_bootstrap_) { - client_bootstrap_ = nullptr; - } - } - - private: - void Connect() { - client_bootstrap_ = cf_->MakeBootstrap(); - auto dispatcher = cf_->Connect(shared_from_this(), client_bootstrap_, remote_id()->host(), - remote_id()->port()); - hbase_service_ = std::move(dispatcher); - } - - private: - std::recursive_mutex mutex_; - std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; - std::shared_ptr<ConnectionId> connection_id_; - std::shared_ptr<HBaseService> hbase_service_; - std::shared_ptr<ConnectionFactory> cf_; - std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client_bootstrap_; -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-fault-injector-inl.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-fault-injector-inl.h b/hbase-native-client/connection/rpc-fault-injector-inl.h deleted file mode 100644 index 8bbaddf..0000000 --- a/hbase-native-client/connection/rpc-fault-injector-inl.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -namespace hbase { - -template <typename T> -std::shared_ptr<T> RpcFaultInjector<T>::instance = std::make_shared<T>(); - -template <typename T> -RpcFaultInjector<T>::RpcFaultInjector() {} - -template <typename T> -RpcFaultInjector<T>::~RpcFaultInjector() {} - -template <typename T> -std::shared_ptr<T> RpcFaultInjector<T>::Get() { - return instance; -} - -template <typename T> -void RpcFaultInjector<T>::Set(std::shared_ptr<T> injector) { - instance = injector; -} -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-fault-injector.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-fault-injector.cc b/hbase-native-client/connection/rpc-fault-injector.cc deleted file mode 100644 index 16e2034..0000000 --- a/hbase-native-client/connection/rpc-fault-injector.cc +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "rpc-fault-injector.h" - -namespace hbase {} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-fault-injector.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-fault-injector.h b/hbase-native-client/connection/rpc-fault-injector.h deleted file mode 100644 index 2733b7d..0000000 --- a/hbase-native-client/connection/rpc-fault-injector.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/io/async/AsyncTransport.h> -#include "connection/pipeline.h" - -namespace hbase { - -template <typename T> -class RpcFaultInjector { - public: - RpcFaultInjector(); - virtual ~RpcFaultInjector(); - - static std::shared_ptr<T> Get(); - static void Set(std::shared_ptr<T> instance); - - private: - static std::shared_ptr<T> instance; -}; - -class RpcClientFaultInjector : public RpcFaultInjector<RpcClientFaultInjector> { - public: - RpcClientFaultInjector() {} - virtual ~RpcClientFaultInjector() {} - /** - * Here goes virtual functions for injecting various faults. They should be no-ops by default. - * Sub classes of RpcClientFaultInjector will override by providing concrete faults. - */ -}; -} /* namespace hbase */ - -#include "connection/rpc-fault-injector-inl.h" http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-test-server-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc deleted file mode 100644 index 8e405ef..0000000 --- a/hbase-native-client/connection/rpc-test-server-handler.cc +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "connection/rpc-test-server-handler.h" -#include "if/RPC.pb.h" -#include "if/test.pb.h" - -namespace hbase { - -void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) { - buf->coalesce(); - pb::RequestHeader header; - - int used_bytes = serde_.ParseDelimited(buf.get(), &header); - VLOG(3) << "Read RPC RequestHeader size=" << used_bytes << " call_id=" << header.call_id(); - - auto received = CreateReceivedRequest(header.method_name()); - - buf->trimStart(used_bytes); - if (header.has_request_param() && received != nullptr) { - used_bytes = serde_.ParseDelimited(buf.get(), received->req_msg().get()); - VLOG(3) << "Read RPCRequest, buf length:" << buf->length() - << ", header PB length:" << used_bytes; - received->set_call_id(header.call_id()); - } - - if (received != nullptr) { - ctx->fireRead(std::move(received)); - } -} - -folly::Future<folly::Unit> RpcTestServerSerializeHandler::write(Context* ctx, - std::unique_ptr<Response> resp) { - VLOG(3) << "Writing RPC Request"; - // Send the data down the pipeline. - return ctx->fireWrite( - serde_.Response(resp->call_id(), resp->resp_msg().get(), resp->exception())); -} - -std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest( - const std::string& method_name) { - std::unique_ptr<Request> result = nullptr; - - if (method_name == "ping") { - result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), - std::make_shared<EmptyResponseProto>(), method_name); - } else if (method_name == "echo") { - result = std::make_unique<Request>(std::make_shared<EchoRequestProto>(), - std::make_shared<EchoResponseProto>(), method_name); - } else if (method_name == "error") { - result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), - std::make_shared<EmptyResponseProto>(), method_name); - } else if (method_name == "pause") { - result = std::make_unique<Request>(std::make_shared<PauseRequestProto>(), - std::make_shared<EmptyResponseProto>(), method_name); - } else if (method_name == "addr") { - result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), - std::make_shared<AddrResponseProto>(), method_name); - } else if (method_name == "socketNotOpen") { - result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), - std::make_shared<EmptyResponseProto>(), method_name); - } - return result; -} -} // end of namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-test-server-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test-server-handler.h b/hbase-native-client/connection/rpc-test-server-handler.h deleted file mode 100644 index ab0264f..0000000 --- a/hbase-native-client/connection/rpc-test-server-handler.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <wangle/channel/Handler.h> - -#include "connection/request.h" -#include "connection/response.h" -#include "serde/rpc-serde.h" - -using namespace hbase; - -namespace hbase { -// A real rpc server would probably use generated client/server stubs -class RpcTestServerSerializeHandler - : public wangle::Handler<std::unique_ptr<folly::IOBuf>, std::unique_ptr<Request>, - std::unique_ptr<Response>, std::unique_ptr<folly::IOBuf>> { - public: - RpcTestServerSerializeHandler() : serde_() {} - - void read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) override; - - folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<Response> resp) override; - - private: - std::unique_ptr<Request> CreateReceivedRequest(const std::string& method_name); - - private: - hbase::RpcSerde serde_; -}; -} // end of namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-test-server.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc deleted file mode 100644 index 157ea71..0000000 --- a/hbase-native-client/connection/rpc-test-server.cc +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <wangle/channel/AsyncSocketHandler.h> -#include <wangle/channel/EventBaseHandler.h> -#include <wangle/codec/LengthFieldBasedFrameDecoder.h> -#include <wangle/codec/LengthFieldPrepender.h> -#include <wangle/service/ServerDispatcher.h> - -#include "connection/rpc-test-server-handler.h" -#include "connection/rpc-test-server.h" -#include "if/test.pb.h" - -namespace hbase { - -RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline( - std::shared_ptr<AsyncTransportWrapper> sock) { - if (service_ == nullptr) { - initService(sock); - } - CHECK(service_ != nullptr); - - auto pipeline = RpcTestServerSerializePipeline::create(); - pipeline->addBack(AsyncSocketHandler(sock)); - // ensure we can write from any thread - pipeline->addBack(EventBaseHandler()); - pipeline->addBack(LengthFieldBasedFrameDecoder()); - pipeline->addBack(RpcTestServerSerializeHandler()); - pipeline->addBack(MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>( - service_.get())); - pipeline->finalize(); - - return pipeline; -} - -void RpcTestServerPipelineFactory::initService(std::shared_ptr<AsyncTransportWrapper> sock) { - /* get server address */ - SocketAddress localAddress; - sock->getLocalAddress(&localAddress); - - /* init service with server address */ - service_ = std::make_shared<ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>>>( - std::make_shared<CPUThreadPoolExecutor>(1), - std::make_shared<RpcTestService>(std::make_shared<SocketAddress>(localAddress))); -} - -Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Request> request) { - /* build Response */ - auto response = std::make_unique<Response>(); - response->set_call_id(request->call_id()); - std::string method_name = request->method(); - - if (method_name == "ping") { - auto pb_resp_msg = std::make_shared<EmptyResponseProto>(); - response->set_resp_msg(pb_resp_msg); - VLOG(1) << "RPC server:" - << " ping called."; - - } else if (method_name == "echo") { - auto pb_resp_msg = std::make_shared<EchoResponseProto>(); - /* get msg from client */ - auto pb_req_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg()); - pb_resp_msg->set_message(pb_req_msg->message()); - response->set_resp_msg(pb_resp_msg); - VLOG(1) << "RPC server:" - << " echo called, " << pb_req_msg->message(); - - } else if (method_name == "error") { - auto pb_resp_msg = std::make_shared<EmptyResponseProto>(); - response->set_resp_msg(pb_resp_msg); - VLOG(1) << "RPC server:" - << " error called."; - response->set_exception(RpcTestException("server error!")); - - } else if (method_name == "pause") { - auto pb_resp_msg = std::make_shared<EmptyResponseProto>(); - /* sleeping */ - auto pb_req_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg()); - std::this_thread::sleep_for(std::chrono::milliseconds(pb_req_msg->ms())); - response->set_resp_msg(pb_resp_msg); - VLOG(1) << "RPC server:" - << " pause called, " << pb_req_msg->ms() << " ms"; - - } else if (method_name == "addr") { - // TODO: - } else if (method_name == "socketNotOpen") { - auto pb_resp_msg = std::make_shared<EmptyResponseProto>(); - response->set_resp_msg(pb_resp_msg); - } - - return folly::makeFuture<std::unique_ptr<Response>>(std::move(response)); -} -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-test-server.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h deleted file mode 100644 index 955560e..0000000 --- a/hbase-native-client/connection/rpc-test-server.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once -#include <folly/SocketAddress.h> -#include <wangle/concurrent/CPUThreadPoolExecutor.h> -#include <wangle/service/ExecutorFilter.h> -#include <wangle/service/Service.h> - -#include "connection/request.h" -#include "connection/response.h" -#include "exceptions/exception.h" - -using namespace hbase; -using namespace folly; -using namespace wangle; - -namespace hbase { -using RpcTestServerSerializePipeline = wangle::Pipeline<IOBufQueue&, std::unique_ptr<Response>>; - -class RpcTestException : public IOException { - public: - RpcTestException() {} - RpcTestException(const std::string& what) : IOException(what) {} - RpcTestException(const std::string& what, const folly::exception_wrapper& cause) - : IOException(what, cause) {} - RpcTestException(const folly::exception_wrapper& cause) : IOException("", cause) {} -}; - -class RpcTestService : public Service<std::unique_ptr<Request>, std::unique_ptr<Response>> { - public: - RpcTestService(std::shared_ptr<folly::SocketAddress> socket_address) - : socket_address_(socket_address) {} - virtual ~RpcTestService() = default; - Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> request) override; - - private: - std::shared_ptr<folly::SocketAddress> socket_address_; -}; - -class RpcTestServerPipelineFactory : public PipelineFactory<RpcTestServerSerializePipeline> { - public: - RpcTestServerSerializePipeline::Ptr newPipeline( - std::shared_ptr<AsyncTransportWrapper> sock) override; - - private: - void initService(std::shared_ptr<AsyncTransportWrapper> sock); - - private: - std::shared_ptr<ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>>> service_{ - nullptr}; -}; -} // end of namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/rpc-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc deleted file mode 100644 index 8624e72..0000000 --- a/hbase-native-client/connection/rpc-test.cc +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <wangle/bootstrap/ClientBootstrap.h> -#include <wangle/channel/Handler.h> - -#include <folly/Format.h> -#include <folly/Logging.h> -#include <folly/SocketAddress.h> -#include <folly/String.h> -#include <folly/experimental/TestUtil.h> -#include <folly/io/async/AsyncSocketException.h> -#include <gflags/gflags.h> -#include <glog/logging.h> -#include <gtest/gtest.h> -#include <boost/thread.hpp> -#include <chrono> - -#include "connection/rpc-client.h" -#include "exceptions/exception.h" -#include "if/test.pb.h" -#include "rpc-test-server.h" -#include "security/user.h" -#include "serde/rpc-serde.h" - -using namespace wangle; -using namespace folly; -using namespace hbase; -using namespace std::chrono; - -DEFINE_int32(port, 0, "test server port"); -DEFINE_string(result_format, "RPC {} returned: {}.", "output format of RPC result"); -DEFINE_string(fail_ex_format, "Shouldn't get here, exception is expected for RPC {}.", - "output format of enforcing fail with exception"); -DEFINE_string(fail_no_ex_format, "Shouldn't get here, exception is not expected for RPC {}.", - "output format of enforcing fail without exception"); -typedef ServerBootstrap<RpcTestServerSerializePipeline> ServerTestBootstrap; -typedef std::shared_ptr<ServerTestBootstrap> ServerPtr; - -class RpcTest : public ::testing::Test { - public: - static void SetUpTestCase() { google::InstallFailureSignalHandler(); } -}; - -std::shared_ptr<Configuration> CreateConf() { - auto conf = std::make_shared<Configuration>(); - conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true"); - return conf; -} - -ServerPtr CreateRpcServer() { - /* create rpc test server */ - auto server = std::make_shared<ServerTestBootstrap>(); - server->childPipeline(std::make_shared<RpcTestServerPipelineFactory>()); - server->bind(FLAGS_port); - return server; -} - -std::shared_ptr<folly::SocketAddress> GetRpcServerAddress(ServerPtr server) { - auto addr = std::make_shared<folly::SocketAddress>(); - server->getSockets()[0]->getAddress(addr.get()); - return addr; -} - -std::shared_ptr<RpcClient> CreateRpcClient(std::shared_ptr<Configuration> conf) { - auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1); - auto cpu_executor = std::make_shared<wangle::CPUThreadPoolExecutor>(1); - auto client = std::make_shared<RpcClient>(io_executor, cpu_executor, nullptr, conf); - return client; -} - -std::shared_ptr<RpcClient> CreateRpcClient(std::shared_ptr<Configuration> conf, - std::chrono::nanoseconds connect_timeout) { - auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1); - auto cpu_executor = std::make_shared<wangle::CPUThreadPoolExecutor>(1); - auto client = - std::make_shared<RpcClient>(io_executor, cpu_executor, nullptr, conf, connect_timeout); - return client; -} - -/** -* test ping -*/ -TEST_F(RpcTest, Ping) { - auto conf = CreateConf(); - auto server = CreateRpcServer(); - auto server_addr = GetRpcServerAddress(server); - auto client = CreateRpcClient(conf); - - auto method = "ping"; - auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), - std::make_shared<EmptyResponseProto>(), method); - - /* sending out request */ - client - ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), - hbase::security::User::defaultUser()) - .then([&](std::unique_ptr<Response> response) { - auto pb_resp = std::static_pointer_cast<EmptyResponseProto>(response->resp_msg()); - EXPECT_TRUE(pb_resp != nullptr); - VLOG(1) << folly::sformat(FLAGS_result_format, method, ""); - }) - .onError([&](const folly::exception_wrapper& ew) { - FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); - }) - .get(); - - server->stop(); - server->join(); -} - -/** - * test echo - */ -TEST_F(RpcTest, Echo) { - auto conf = CreateConf(); - auto server = CreateRpcServer(); - auto server_addr = GetRpcServerAddress(server); - auto client = CreateRpcClient(conf); - - auto method = "echo"; - auto greetings = "hello, hbase server!"; - auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(), - std::make_shared<EchoResponseProto>(), method); - auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg()); - pb_msg->set_message(greetings); - - /* sending out request */ - client - ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), - hbase::security::User::defaultUser()) - .then([&](std::unique_ptr<Response> response) { - auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg()); - EXPECT_TRUE(pb_resp != nullptr); - VLOG(1) << folly::sformat(FLAGS_result_format, method, pb_resp->message()); - EXPECT_EQ(greetings, pb_resp->message()); - }) - .onError([&](const folly::exception_wrapper& ew) { - FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); - }) - .get(); - - server->stop(); - server->join(); -} - -/** - * test error - */ -TEST_F(RpcTest, Error) { - auto conf = CreateConf(); - auto server = CreateRpcServer(); - auto server_addr = GetRpcServerAddress(server); - auto client = CreateRpcClient(conf); - - auto method = "error"; - auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), - std::make_shared<EmptyResponseProto>(), method); - /* sending out request */ - client - ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), - hbase::security::User::defaultUser()) - .then([&](std::unique_ptr<Response> response) { - FAIL() << folly::sformat(FLAGS_fail_ex_format, method); - }) - .onError([&](const folly::exception_wrapper& ew) { - VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); - std::string kRemoteException = demangle(typeid(hbase::RemoteException)).toStdString(); - std::string kRpcTestException = demangle(typeid(hbase::RpcTestException)).toStdString(); - - /* verify exception_wrapper */ - EXPECT_TRUE(bool(ew)); - EXPECT_EQ(kRemoteException, ew.class_name()); - - /* verify exception */ - EXPECT_TRUE(ew.with_exception([&](const hbase::RemoteException& e) { - EXPECT_EQ(kRpcTestException, e.exception_class_name()); - EXPECT_EQ(kRpcTestException + ": server error!", e.stack_trace()); - })); - }) - .get(); - - server->stop(); - server->join(); -} - -TEST_F(RpcTest, SocketNotOpen) { - auto conf = CreateConf(); - auto server = CreateRpcServer(); - auto server_addr = GetRpcServerAddress(server); - auto client = CreateRpcClient(conf); - - auto method = "socketNotOpen"; - auto request = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), - std::make_shared<EmptyResponseProto>(), method); - - server->stop(); - server->join(); - - /* sending out request */ - client - ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), - hbase::security::User::defaultUser()) - .then([&](std::unique_ptr<Response> response) { - FAIL() << folly::sformat(FLAGS_fail_ex_format, method); - }) - .onError([&](const folly::exception_wrapper& ew) { - VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); - std::string kConnectionException = - demangle(typeid(hbase::ConnectionException)).toStdString(); - std::string kAsyncSocketException = - demangle(typeid(folly::AsyncSocketException)).toStdString(); - - /* verify exception_wrapper */ - EXPECT_TRUE(bool(ew)); - EXPECT_EQ(kConnectionException, ew.class_name()); - - /* verify exception */ - EXPECT_TRUE(ew.with_exception([&](const hbase::ConnectionException& e) { - EXPECT_TRUE(bool(e.cause())); - EXPECT_EQ(kAsyncSocketException, e.cause().class_name()); - VLOG(1) << folly::sformat(FLAGS_result_format, method, e.cause().what()); - e.cause().with_exception([&](const folly::AsyncSocketException& ase) { - EXPECT_EQ(AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN, ase.getType()); - EXPECT_EQ(111 /*ECONNREFUSED*/, ase.getErrno()); - }); - })); - }) - .get(); -} - -/** - * test pause - */ -TEST_F(RpcTest, Pause) { - int ms = 500; - - auto conf = CreateConf(); - auto server = CreateRpcServer(); - auto server_addr = GetRpcServerAddress(server); - auto client = - CreateRpcClient(conf, std::chrono::duration_cast<nanoseconds>(milliseconds(2 * ms))); - - auto method = "pause"; - auto request = std::make_unique<Request>(std::make_shared<PauseRequestProto>(), - std::make_shared<EmptyResponseProto>(), method); - auto pb_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg()); - - pb_msg->set_ms(ms); - - /* sending out request */ - client - ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), - hbase::security::User::defaultUser()) - .then([&](std::unique_ptr<Response> response) { - auto pb_resp = std::static_pointer_cast<EmptyResponseProto>(response->resp_msg()); - EXPECT_TRUE(pb_resp != nullptr); - VLOG(1) << folly::sformat(FLAGS_result_format, method, ""); - }) - .onError([&](const folly::exception_wrapper& ew) { - VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); - FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); - }) - .get(); - - server->stop(); - server->join(); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/sasl-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/sasl-handler.cc b/hbase-native-client/connection/sasl-handler.cc deleted file mode 100644 index 9afe1e2..0000000 --- a/hbase-native-client/connection/sasl-handler.cc +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "connection/sasl-handler.h" - -#include <glog/logging.h> -#include <sasl/sasl.h> -#include <sasl/saslplug.h> -#include <sasl/saslutil.h> - -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl.h> -#include <wangle/channel/Handler.h> - -#include <condition_variable> -#include <memory> -#include <mutex> -#include <string> -#include <utility> - -#include "connection/service.h" -#include "security/user.h" -using hbase::security::User; - -using std::chrono::nanoseconds; -using namespace folly; -using namespace wangle; -using namespace hbase; - -SaslHandler::SaslHandler(std::string user_name, std::shared_ptr<Configuration> conf) - : user_name_(user_name) { - host_name_.clear(); - secure_ = User::IsSecurityEnabled(*conf); - service_name_ = SaslUtil::ParseServiceName(conf, secure_); - sasl_connection_setup_started_.clear(); - sasl_connection_setup_in_progress_.store(true); -} - -SaslHandler::SaslHandler(const SaslHandler &hdlr) { - user_name_ = hdlr.user_name_; - service_name_ = hdlr.service_name_; - secure_ = hdlr.secure_; - host_name_ = hdlr.host_name_; - // copy-constructor sets the flags below to their initial state as opposed to getting them - // from the object this class is constructed from. That way, this instance is ready to do - // sasl stuff without issues, right from the SaslInit. Sharing a sasl session is not useful - // between two handler instances. - sasl_connection_setup_started_.clear(); - sasl_connection_setup_in_progress_.store(true); - sconn_ = nullptr; -} - -SaslHandler::~SaslHandler() { - if (nullptr != sconn_) { - sasl_dispose(&sconn_); - } - sconn_ = nullptr; -} - -void SaslHandler::transportActive(Context *ctx) { - // assign hostname; needed for the sasl handshake if secure - folly::SocketAddress address; - ctx->getTransport()->getPeerAddress(&address); - host_name_ = address.getHostStr(); - - // now init the sasl library; this is once per process - if (secure_) { - sasl_util_.InitializeSaslLib(); - } - // write the preamble to kick off the RPC handshake - VLOG(3) << "Writing RPC connection Preamble to server: " << host_name_; - auto preamble = RpcSerde::Preamble(secure_); - ctx->fireWrite(std::move(preamble)); - ctx->fireTransportActive(); -} - -void SaslHandler::read(Context *ctx, folly::IOBufQueue &buf) { - // if security is not on, or in case of security-on, if secure connection setup not in progress, - // pass it up without touching - if (!secure_ || !sasl_connection_setup_in_progress_.load()) { - ctx->fireRead(buf); - } else { - // message is for this handler; process it appropriately - ContinueSaslNegotiation(ctx, &buf); - } -} - -folly::Future<folly::Unit> SaslHandler::write(Context *ctx, std::unique_ptr<folly::IOBuf> buf) { - // if security is on, and if secure connection setup in progress, - // this message is for this handler to process and respond - if (secure_ && sasl_connection_setup_in_progress_.load()) { - // store IOBuf which is to be sent to server after SASL handshake - iobuf_.push_back(std::move(buf)); - if (!sasl_connection_setup_started_.test_and_set()) { - // for the first incoming RPC from the higher layer, trigger sasl initialization - return SaslInit(ctx); - } else { - // for the subsequent incoming RPCs from the higher layer, just return empty future - folly::Promise<folly::Unit> p_; - return p_.getFuture(); - } - } - // pass the bytes recieved down without touching it - return ctx->fireWrite(std::move(buf)); -} - -folly::Future<folly::Unit> SaslHandler::WriteSaslOutput(Context *ctx, const char *out, - unsigned int outlen) { - int buffer_size = outlen + 4; - auto iob = IOBuf::create(buffer_size); - iob->append(buffer_size); - // Create the array output stream. - google::protobuf::io::ArrayOutputStream aos{iob->writableData(), buffer_size}; - std::unique_ptr<google::protobuf::io::CodedOutputStream> coded_output = - std::make_unique<google::protobuf::io::CodedOutputStream>(&aos); - uint32_t total_size = outlen; - total_size = ntohl(total_size); - coded_output->WriteRaw(&total_size, 4); - coded_output->WriteRaw(out, outlen); - return ctx->fireWrite(std::move(iob)); -} - -void SaslHandler::FinishAuth(Context *ctx, folly::IOBufQueue *bufQueue) { - std::unique_ptr<folly::IOBuf> iob; - if (!bufQueue->empty()) { - iob = bufQueue->pop_front(); - throw std::runtime_error("Error in the final step of handshake " + - std::string(reinterpret_cast<const char *>(iob->data()))); - } else { - sasl_connection_setup_in_progress_.store(false); - // write what we buffered - for (size_t i = 0; i < iobuf_.size(); i++) { - iob = std::move(iobuf_.at(i)); - ctx->fireWrite(std::move(iob)); - } - } -} - -folly::Future<folly::Unit> SaslHandler::SaslInit(Context *ctx) { - int rc; - const char *mechusing, *mechlist = "GSSAPI"; - const char *out; - unsigned int outlen; - - rc = sasl_client_new(service_name_.c_str(), /* The service we are using*/ - host_name_.c_str(), NULL, - NULL, /* Local and remote IP address strings - (NULL disables mechanisms which require this info)*/ - NULL, /*connection-specific callbacks*/ - 0 /*security flags*/, &sconn_); - if (rc != SASL_OK) { - LOG(FATAL) << "Cannot create client (" << rc << ") "; - throw std::runtime_error("Cannot create client"); - } - int curr_rc; - do { - curr_rc = sasl_client_start(sconn_, /* the same context from above */ - mechlist, /* the list of mechanisms from the server */ - NULL, /* filled in if an interaction is needed */ - &out, /* filled in on success */ - &outlen, /* filled in on success */ - &mechusing); - } while (curr_rc == SASL_INTERACT); /* the mechanism may ask us to fill - in things many times. result is SASL_CONTINUE on success */ - if (curr_rc != SASL_CONTINUE) { - throw std::runtime_error("Cannot start client (" + std::to_string(curr_rc) + ")"); - } - folly::Future<folly::Unit> fut = WriteSaslOutput(ctx, out, outlen); - return fut; -} - -void SaslHandler::ContinueSaslNegotiation(Context *ctx, folly::IOBufQueue *bufQueue) { - const char *out; - unsigned int outlen; - - int bytes_sent = 0; - int bytes_received = 0; - - std::unique_ptr<folly::IOBuf> iob = bufQueue->pop_front(); - bytes_received = iob->length(); - if (bytes_received == 0) { - throw std::runtime_error("Error in sasl handshake"); - } - folly::io::RWPrivateCursor c(iob.get()); - std::uint32_t status = c.readBE<std::uint32_t>(); - std::uint32_t sz = c.readBE<std::uint32_t>(); - - if (status != 0 /*Status 0 is success*/) { - // Assumption here is that the response from server is not more than 8 * 1024 - throw std::runtime_error("Error in sasl handshake " + - std::string(reinterpret_cast<char *>(c.writableData()))); - } - out = nullptr; - outlen = 0; - - int curr_rc = - sasl_client_step(sconn_, /* our context */ - reinterpret_cast<char *>(c.writableData()), /* the data from the server */ - sz, /* its length */ - NULL, /* this should be unallocated and NULL */ - &out, /* filled in on success */ - &outlen); /* filled in on success */ - - if (curr_rc == SASL_OK || curr_rc == SASL_CONTINUE) { - WriteSaslOutput(ctx, out, outlen); - } - if (curr_rc == SASL_OK) { - FinishAuth(ctx, bufQueue); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/sasl-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/sasl-handler.h b/hbase-native-client/connection/sasl-handler.h deleted file mode 100644 index 81f4e81..0000000 --- a/hbase-native-client/connection/sasl-handler.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <glog/logging.h> -#include <sasl/sasl.h> -#include <sasl/saslplug.h> -#include <sasl/saslutil.h> - -#include <memory> -#include <string> -#include <vector> - -#include "connection/sasl-util.h" -#include "connection/service.h" -#include "security/user.h" -#include "serde/rpc-serde.h" - -namespace hbase { - -/** - * Class to perform SASL handshake with server (currently works with regionserver principals only) - * It is inserted between EventBaseHandler and LengthFieldBasedFrameDecoder in the pipeline - * SaslHandler would intercept writes to server by buffering the IOBuf's and start the handshake - * process - * (via sasl_client_XX calls provided by Cyrus) - * After handshake is complete, SaslHandler would send the buffered IOBuf's to server and - * act as pass-thru from then on - */ -class SaslHandler - : public wangle::HandlerAdapter<folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>> { - public: - explicit SaslHandler(std::string user_name, std::shared_ptr<Configuration> conf); - SaslHandler(const SaslHandler& hdlr); - ~SaslHandler(); - - // from HandlerAdapter - void read(Context* ctx, folly::IOBufQueue& buf) override; - folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<folly::IOBuf> buf) override; - void transportActive(Context* ctx) override; - - private: - // used by Cyrus - sasl_conn_t* sconn_ = nullptr; - std::string user_name_; - std::string service_name_; - std::string host_name_; - bool secure_; - std::atomic_flag sasl_connection_setup_started_; - std::atomic<bool> sasl_connection_setup_in_progress_{true}; - // vector of folly::IOBuf which buffers client writes before handshake is complete - std::vector<std::unique_ptr<folly::IOBuf>> iobuf_; - SaslUtil sasl_util_; - - // writes the output returned by sasl_client_XX to server - folly::Future<folly::Unit> WriteSaslOutput(Context* ctx, const char* out, unsigned int outlen); - folly::Future<folly::Unit> SaslInit(Context* ctx); - void FinishAuth(Context* ctx, folly::IOBufQueue* bufQueue); - void ContinueSaslNegotiation(Context* ctx, folly::IOBufQueue* buf); - std::string ParseServiceName(std::shared_ptr<Configuration> conf, bool secure); -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/sasl-util.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/sasl-util.cc b/hbase-native-client/connection/sasl-util.cc deleted file mode 100644 index ecaf015..0000000 --- a/hbase-native-client/connection/sasl-util.cc +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "connection/sasl-util.h" - -#include <glog/logging.h> -#include <sasl/sasl.h> -#include <sasl/saslplug.h> -#include <sasl/saslutil.h> - -#include <string> - -int SaslUtil::GetPluginPath(void *context __attribute__((unused)), const char **path) { - *path = getenv("SASL_PATH"); - - if (*path == NULL) { - *path = kDefaultPluginDir; - } - return SASL_OK; -} - -void *SaslUtil::MutexNew(void) { - auto m = new std::mutex(); - return m; -} - -int SaslUtil::MutexLock(void *m) { - (reinterpret_cast<std::mutex *>(m))->lock(); - return SASL_OK; -} - -int SaslUtil::MutexUnlock(void *m) { - (reinterpret_cast<std::mutex *>(m))->unlock(); - return SASL_OK; -} - -void SaslUtil::MutexDispose(void *m) { - std::mutex *mutex = reinterpret_cast<std::mutex *>(m); - delete mutex; -} - -std::once_flag SaslUtil::library_inited_; - -void SaslUtil::InitializeSaslLib() { - std::call_once(library_inited_, []() { - sasl_set_mutex(reinterpret_cast<sasl_mutex_alloc_t *>(&SaslUtil::MutexNew), - reinterpret_cast<sasl_mutex_lock_t *>(&SaslUtil::MutexLock), - reinterpret_cast<sasl_mutex_unlock_t *>(&SaslUtil::MutexUnlock), - reinterpret_cast<sasl_mutex_free_t *>(&SaslUtil::MutexDispose)); - static sasl_callback_t callbacks[] = { - {SASL_CB_GETPATH, (sasl_callback_ft)&SaslUtil::GetPluginPath, NULL}, - {SASL_CB_LIST_END, NULL, NULL}}; - int rc = sasl_client_init(callbacks); - if (rc != SASL_OK) { - throw std::runtime_error("Cannot initialize client " + std::to_string(rc)); - } - }); -} - -std::string SaslUtil::ParseServiceName(std::shared_ptr<hbase::Configuration> conf, bool secure) { - if (!secure) { - return std::string(); - } - std::string svrPrincipal = conf->Get(kServerPrincipalConfKey, ""); - // principal is of this form: hbase/23a039358...@example.com - // where 23a03935850c is the host (optional) - std::size_t pos = svrPrincipal.find("/"); - if (pos == std::string::npos && svrPrincipal.find("@") != std::string::npos) { - pos = svrPrincipal.find("@"); - } - if (pos == std::string::npos) { - throw std::runtime_error("Couldn't retrieve service principal from conf"); - } - VLOG(1) << "pos " << pos << " " << svrPrincipal; - std::string service_name = svrPrincipal.substr(0, pos); - return service_name; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/sasl-util.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/sasl-util.h b/hbase-native-client/connection/sasl-util.h deleted file mode 100644 index 4d58d9ee..0000000 --- a/hbase-native-client/connection/sasl-util.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <memory> -#include <mutex> -#include <string> - -#include "core/configuration.h" - -class SaslUtil { - public: - void InitializeSaslLib(void); - static std::string ParseServiceName(std::shared_ptr<hbase::Configuration> conf, bool secure); - - private: - static constexpr const char *kDefaultPluginDir = "/usr/lib/sasl2"; - // for now the sasl handler is hardcoded to work against the regionservers only. In the future, if - // we - // need the master rpc to work, we could have a map of service names to principals to use (similar - // to the Java implementation) - static constexpr const char *kServerPrincipalConfKey = "hbase.regionserver.kerberos.principal"; - - static int GetPluginPath(void *context, const char **path); - static void *MutexNew(void); - static int MutexLock(void *m); - static int MutexUnlock(void *m); - static void MutexDispose(void *m); - static std::once_flag library_inited_; -}; http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/connection/service.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/service.h b/hbase-native-client/connection/service.h deleted file mode 100644 index 64d4f07..0000000 --- a/hbase-native-client/connection/service.h +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <wangle/service/Service.h> - -#include <memory> - -#include "connection/request.h" -#include "connection/response.h" - -namespace hbase { -using HBaseService = wangle::Service<std::unique_ptr<Request>, std::unique_ptr<Response>>; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK deleted file mode 100644 index 76c836b..0000000 --- a/hbase-native-client/core/BUCK +++ /dev/null @@ -1,348 +0,0 @@ -## -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This is the main library. -cxx_library( - name="core", - exported_headers=[ - "async-client-scanner.h", - "async-connection.h", - "async-region-locator.h", - "async-rpc-retrying-caller-factory.h", - "async-rpc-retrying-caller.h", - "async-table-result-scanner.h", - "client.h", - "cell.h", - "hbase-macros.h", - "filter.h", - "query.h", - "keyvalue-codec.h", - "region-location.h", - "location-cache.h", - "connection-configuration.h", - # TODO: move this out of exported - # Once meta lookup works - "meta-utils.h", - "get.h", - "increment.h", - "mutation.h", - "put.h", - "delete.h", - "scan.h", - "append.h", - "result.h", - "result-scanner.h", - "request-converter.h", - "response-converter.h", - "table.h", - "async-scan-rpc-retrying-caller.h", - "raw-async-table.h", - "raw-scan-result-consumer.h", - "scan-result-cache.h", - "hbase-rpc-controller.h", - "time-range.h", - "zk-util.h", - "action.h", - "multi-response.h", - "region-request.h", - "region-result.h", - "row.h", - "server-request.h", - "async-batch-rpc-retrying-caller.h", - ], - srcs=[ - "async-client-scanner.cc", - "async-connection.cc", - "async-rpc-retrying-caller-factory.cc", - "async-rpc-retrying-caller.cc", - "async-scan-rpc-retrying-caller.cc", - "async-table-result-scanner.cc", - "cell.cc", - "client.cc", - "hbase-rpc-controller.cc", - "keyvalue-codec.cc", - "location-cache.cc", - "meta-utils.cc", - "increment.cc", - "get.cc", - "mutation.cc", - "put.cc", - "delete.cc", - "scan.cc", - "append.cc", - "scan-result-cache.cc", - "raw-async-table.cc", - "result.cc", - "request-converter.cc", - "response-converter.cc", - "table.cc", - "time-range.cc", - "zk-util.cc", - "multi-response.cc", - "region-result.cc", - "async-batch-rpc-retrying-caller.cc", - ], - deps=[ - "//exceptions:exceptions", - "//utils:utils", - "//connection:connection", - "//core:conf", - "//if:if", - "//serde:serde", - "//third-party:folly", - "//third-party:wangle", - "//third-party:zookeeper_mt", - ], - compiler_flags=['-Weffc++', '-ggdb'], - visibility=[ - 'PUBLIC', - ],) -cxx_library( - name="conf", - exported_headers=[ - "configuration.h", - "hbase-configuration-loader.h", - ], - srcs=[ - "configuration.cc", - "hbase-configuration-loader.cc", - ], - deps=["//utils:utils", "//third-party:folly"], - compiler_flags=['-Weffc++', '-ggdb'], - visibility=[ - 'PUBLIC', - ],) -cxx_test( - name="location-cache-test", - srcs=[ - "location-cache-test.cc", - ], - deps=[ - ":core", - "//test-util:test-util", - ], - run_test_separately=True,) -cxx_test( - name="location-cache-retry-test", - srcs=[ - "location-cache-retry-test.cc", - ], - deps=[ - ":core", - "//if:if", - "//serde:serde", - "//test-util:test-util", - ], - run_test_separately=True,) -cxx_test( - name="cell-test", - srcs=[ - "cell-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="filter-test", - srcs=[ - "filter-test.cc", - ], - deps=[ - ":core", - "//if:if", - "//serde:serde", - "//test-util:test-util", - ], - run_test_separately=True,) -cxx_test( - name="get-test", - srcs=[ - "get-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="delete-test", - srcs=[ - "delete-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="increment-test", - srcs=[ - "increment-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="put-test", - srcs=[ - "put-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="append-test", - srcs=[ - "append-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="retry-test", - srcs=[ - "async-rpc-retrying-test.cc", - ], - deps=[ - ":core", - "//test-util:test-util", - "//exceptions:exceptions", - ], - run_test_separately=True,) -cxx_test( - name="time-range-test", - srcs=[ - "time-range-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="configuration-test", - srcs=[ - "configuration-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="hbase-configuration-test", - srcs=[ - "hbase-configuration-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="scan-test", - srcs=[ - "scan-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="result-test", - srcs=[ - "result-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="request-converter-test", - srcs=[ - "request-converter-test.cc", - ], - deps=[ - ":core", - "//connection:connection", - "//if:if", - ], - run_test_separately=True,) -cxx_test( - name="client-test", - srcs=[ - "client-test.cc", - ], - deps=[ - ":core", - "//if:if", - "//serde:serde", - "//test-util:test-util", - ], - run_test_separately=True,) -cxx_test( - name="scan-result-cache-test", - srcs=[ - "scan-result-cache-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="scanner-test", - srcs=[ - "scanner-test.cc", - ], - deps=[ - ":core", - "//if:if", - "//serde:serde", - "//test-util:test-util", - ], - run_test_separately=True,) -cxx_test( - name="zk-util-test", - srcs=[ - "zk-util-test.cc", - ], - deps=[ - ":core", - ], - run_test_separately=True,) -cxx_test( - name="multi-retry-test", - srcs=[ - "async-batch-rpc-retrying-test.cc", - ], - deps=[ - ":core", - "//test-util:test-util", - "//exceptions:exceptions", - ], - run_test_separately=True,) -cxx_binary( - name="simple-client", - srcs=[ - "simple-client.cc", - ], - deps=[":core", "//connection:connection"],) -cxx_binary( - name="load-client", - srcs=[ - "load-client.cc", - ], - deps=[":core", "//connection:connection"],) http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/action.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h deleted file mode 100644 index a00f079..0000000 --- a/hbase-native-client/core/action.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <memory> -#include "core/row.h" - -namespace hbase { -class Action { - public: - Action(std::shared_ptr<hbase::Row> action, int32_t original_index) - : action_(action), original_index_(original_index) {} - ~Action() {} - - int32_t original_index() const { return original_index_; } - - std::shared_ptr<hbase::Row> action() const { return action_; } - - private: - std::shared_ptr<hbase::Row> action_; - int32_t original_index_; - int64_t nonce_ = -1; - int32_t replica_id_ = -1; -}; - -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/append-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/append-test.cc b/hbase-native-client/core/append-test.cc deleted file mode 100644 index 2216034..0000000 --- a/hbase-native-client/core/append-test.cc +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <glog/logging.h> -#include <gtest/gtest.h> - -#include "core/append.h" -#include "core/mutation.h" -#include "utils/time-util.h" - -using hbase::Append; -using hbase::Cell; -using hbase::CellType; -using hbase::Mutation; -using hbase::TimeUtil; - -const constexpr int64_t Mutation::kLatestTimestamp; - -TEST(Append, Row) { - Append append{"foo"}; - EXPECT_EQ("foo", append.row()); -} - -TEST(Append, Durability) { - Append append{"row"}; - EXPECT_EQ(hbase::pb::MutationProto_Durability_USE_DEFAULT, append.Durability()); - - auto skipWal = hbase::pb::MutationProto_Durability_SKIP_WAL; - append.SetDurability(skipWal); - EXPECT_EQ(skipWal, append.Durability()); -} - -TEST(Append, Timestamp) { - Append append{"row"}; - - // test default timestamp - EXPECT_EQ(Mutation::kLatestTimestamp, append.TimeStamp()); - - // set custom timestamp - auto ts = TimeUtil::ToMillis(TimeUtil::GetNowNanos()); - append.SetTimeStamp(ts); - EXPECT_EQ(ts, append.TimeStamp()); - - // Add a column with custom timestamp - append.Add("f", "q", "v"); - auto &cell = append.FamilyMap().at("f")[0]; - EXPECT_EQ(ts, cell->Timestamp()); -} - -TEST(Append, HasFamilies) { - Append append{"row"}; - - EXPECT_EQ(false, append.HasFamilies()); - - append.Add("f", "q", "v"); - EXPECT_EQ(true, append.HasFamilies()); -} - -TEST(Append, Add) { - CellType cell_type = CellType::PUT; - std::string row = "row"; - std::string family = "family"; - std::string column = "column"; - std::string value = "value"; - int64_t timestamp = std::numeric_limits<int64_t>::max(); - auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); - - // add first cell - Append append{"row"}; - append.Add(std::move(cell)); - EXPECT_EQ(1, append.FamilyMap().size()); - EXPECT_EQ(1, append.FamilyMap().at(family).size()); - - // add a non-matching row - auto cell2 = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); - Append append2{"foo"}; - ASSERT_THROW(append2.Add(std::move(cell2)), std::runtime_error); // rows don't match - - // add a second cell with same family - auto cell3 = std::make_unique<Cell>(row, family, "column-2", timestamp, value, cell_type); - append.Add(std::move(cell3)); - EXPECT_EQ(1, append.FamilyMap().size()); - EXPECT_EQ(2, append.FamilyMap().at(family).size()); - - // add a cell to a different family - auto cell4 = std::make_unique<Cell>(row, "family-2", "column-2", timestamp, value, cell_type); - append.Add(std::move(cell4)); - EXPECT_EQ(2, append.FamilyMap().size()); - EXPECT_EQ(1, append.FamilyMap().at("family-2").size()); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/append.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/append.cc b/hbase-native-client/core/append.cc deleted file mode 100644 index 95349ae..0000000 --- a/hbase-native-client/core/append.cc +++ /dev/null @@ -1,53 +0,0 @@ - - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/append.h" -#include <folly/Conv.h> -#include <algorithm> -#include <limits> -#include <stdexcept> -#include <utility> - -namespace hbase { - -/** - * @brief Append to the column from the specific family with the specified qualifier - * @param family family name - * @param qualifier column qualifier - * @param value value to append - */ -Append& Append::Add(const std::string& family, const std::string& qualifier, - const std::string& value) { - family_map_[family].push_back(std::move( - std::make_unique<Cell>(row_, family, qualifier, timestamp_, value, hbase::CellType::PUT))); - return *this; -} -Append& Append::Add(std::unique_ptr<Cell> cell) { - if (cell->Row() != row_) { - throw std::runtime_error("The row in " + cell->DebugString() + - " doesn't match the original one " + row_); - } - - family_map_[cell->Family()].push_back(std::move(cell)); - return *this; -} - -} // namespace hbase