http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/logging_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/logging_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/logging_test.cc new file mode 100644 index 0000000..d487bf5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/logging_test.cc @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <common/logging.h> +#include <bindings/c/hdfs.cc> + +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <iostream> + +using namespace hdfs; + +struct log_state { + int trace_count; + int debug_count; + int info_count; + int warning_count; + int error_count; + + int origin_unknown; + int origin_rpc; + int origin_blockreader; + int origin_filehandle; + int origin_filesystem; + + std::string msg; + + log_state() { + reset(); + } + + void reset() { + trace_count = 0; + debug_count = 0; + info_count = 0; + warning_count = 0; + error_count = 0; + + origin_unknown = 0; + origin_rpc = 0; + origin_blockreader = 0; + origin_filehandle = 0; + origin_filesystem = 0; + + msg = ""; + } +}; +log_state log_state_instance; + +void process_log_msg(LogData *data) { + if(data->msg) + log_state_instance.msg = data->msg; + + switch(data->level) { + case HDFSPP_LOG_LEVEL_TRACE: + log_state_instance.trace_count++; + break; + case HDFSPP_LOG_LEVEL_DEBUG: + log_state_instance.debug_count++; + break; + case HDFSPP_LOG_LEVEL_INFO: + log_state_instance.info_count++; + break; + case HDFSPP_LOG_LEVEL_WARN: + log_state_instance.warning_count++; + break; + case HDFSPP_LOG_LEVEL_ERROR: + log_state_instance.error_count++; + break; + default: + //should never happen + std::cout << "foo" << std::endl; + ASSERT_FALSE(true); + } + + switch(data->component) { + case HDFSPP_LOG_COMPONENT_UNKNOWN: + log_state_instance.origin_unknown++; + break; + case HDFSPP_LOG_COMPONENT_RPC: + log_state_instance.origin_rpc++; + break; + case HDFSPP_LOG_COMPONENT_BLOCKREADER: + log_state_instance.origin_blockreader++; + break; + case HDFSPP_LOG_COMPONENT_FILEHANDLE: + log_state_instance.origin_filehandle++; + break; + case HDFSPP_LOG_COMPONENT_FILESYSTEM: + log_state_instance.origin_filesystem++; + break; + default: + std::cout << "bar" << std::endl; + ASSERT_FALSE(true); + } + +} + +void reset_log_counters() { + log_state_instance.reset(); +} + +void assert_nothing_logged() { + if(log_state_instance.trace_count || log_state_instance.debug_count || + log_state_instance.info_count || log_state_instance.warning_count || + log_state_instance.error_count) { + ASSERT_FALSE(true); + } +} + +void assert_trace_logged() { ASSERT_TRUE(log_state_instance.trace_count > 0); } +void assert_debug_logged() { ASSERT_TRUE(log_state_instance.debug_count > 0); } +void assert_info_logged() { ASSERT_TRUE(log_state_instance.info_count > 0); } +void assert_warning_logged() { ASSERT_TRUE(log_state_instance.warning_count > 0); } +void assert_error_logged() { ASSERT_TRUE(log_state_instance.error_count > 0); } + +void assert_no_trace_logged() { ASSERT_EQ(log_state_instance.trace_count, 0); } +void assert_no_debug_logged() { ASSERT_EQ(log_state_instance.debug_count, 0); } +void assert_no_info_logged() { ASSERT_EQ(log_state_instance.info_count, 0); } +void assert_no_warning_logged() { ASSERT_EQ(log_state_instance.warning_count, 0); } +void assert_no_error_logged() { ASSERT_EQ(log_state_instance.error_count, 0); } + +void assert_unknown_logged() { ASSERT_TRUE(log_state_instance.origin_unknown > 0); } +void assert_rpc_logged() { ASSERT_TRUE(log_state_instance.origin_rpc > 0); } +void assert_blockreader_logged() { ASSERT_TRUE(log_state_instance.origin_blockreader > 0); } +void assert_filehandle_logged() { ASSERT_TRUE(log_state_instance.origin_filehandle > 0); } +void assert_filesystem_logged() { ASSERT_TRUE(log_state_instance.origin_filesystem > 0); } + +void assert_no_unknown_logged() { ASSERT_EQ(log_state_instance.origin_unknown, 0); } +void assert_no_rpc_logged() { ASSERT_EQ(log_state_instance.origin_rpc, 0); } +void assert_no_blockreader_logged() { ASSERT_EQ(log_state_instance.origin_blockreader, 0); } +void assert_no_filehandle_logged() { ASSERT_EQ(log_state_instance.origin_filehandle, 0); } +void assert_no_filesystem_logged() { ASSERT_EQ(log_state_instance.origin_filesystem, 0); } + +void log_all_components_at_level(LogLevel lvl) { + if(lvl == kTrace) { + LOG_TRACE(kUnknown, << 'a'); + LOG_TRACE(kRPC, << 'b'); + LOG_TRACE(kBlockReader, << 'c'); + LOG_TRACE(kFileHandle, << 'd'); + LOG_TRACE(kFileSystem, << 'e'); + } else if (lvl == kDebug) { + LOG_DEBUG(kUnknown, << 'a'); + LOG_DEBUG(kRPC, << 'b'); + LOG_DEBUG(kBlockReader, << 'c'); + LOG_DEBUG(kFileHandle, << 'd'); + LOG_DEBUG(kFileSystem, << 'e'); + } else if (lvl == kInfo) { + LOG_INFO(kUnknown, << 'a'); + LOG_INFO(kRPC, << 'b'); + LOG_INFO(kBlockReader, << 'c'); + LOG_INFO(kFileHandle, << 'd'); + LOG_INFO(kFileSystem, << 'e'); + } else if (lvl == kWarning) { + LOG_WARN(kUnknown, << 'a'); + LOG_WARN(kRPC, << 'b'); + LOG_WARN(kBlockReader, << 'c'); + LOG_WARN(kFileHandle, << 'd'); + LOG_WARN(kFileSystem, << 'e'); + } else if (lvl == kError) { + LOG_ERROR(kUnknown, << 'a'); + LOG_ERROR(kRPC, << 'b'); + LOG_ERROR(kBlockReader, << 'c'); + LOG_ERROR(kFileHandle, << 'd'); + LOG_ERROR(kFileSystem, << 'e'); + } else { + // A level was added and not accounted for here + ASSERT_TRUE(false); + } +} + +// make sure everything can be masked +TEST(LoggingTest, MaskAll) { + LogManager::DisableLogForComponent(kUnknown); + LogManager::DisableLogForComponent(kRPC); + LogManager::DisableLogForComponent(kBlockReader); + LogManager::DisableLogForComponent(kFileHandle); + LogManager::DisableLogForComponent(kFileSystem); + + // use trace so anything that isn't masked should come through + LogManager::SetLogLevel(kTrace); + log_state_instance.reset(); + log_all_components_at_level(kError); + assert_nothing_logged(); + log_state_instance.reset(); +} + +// make sure components can be masked individually +TEST(LoggingTest, MaskOne) { + LogManager::DisableLogForComponent(kUnknown); + LogManager::DisableLogForComponent(kRPC); + LogManager::DisableLogForComponent(kBlockReader); + LogManager::DisableLogForComponent(kFileHandle); + LogManager::DisableLogForComponent(kFileSystem); + LogManager::SetLogLevel(kTrace); + + // Unknown - aka component not provided + LogManager::EnableLogForComponent(kUnknown); + log_all_components_at_level(kError); + assert_unknown_logged(); + assert_error_logged(); + assert_no_rpc_logged(); + assert_no_blockreader_logged(); + assert_no_filehandle_logged(); + assert_no_filesystem_logged(); + log_state_instance.reset(); + LogManager::DisableLogForComponent(kUnknown); + + // RPC + LogManager::EnableLogForComponent(kRPC); + log_all_components_at_level(kError); + assert_rpc_logged(); + assert_error_logged(); + assert_no_unknown_logged(); + assert_no_blockreader_logged(); + assert_no_filehandle_logged(); + assert_no_filesystem_logged(); + log_state_instance.reset(); + LogManager::DisableLogForComponent(kRPC); + + // BlockReader + LogManager::EnableLogForComponent(kBlockReader); + log_all_components_at_level(kError); + assert_blockreader_logged(); + assert_error_logged(); + assert_no_unknown_logged(); + assert_no_rpc_logged(); + assert_no_filehandle_logged(); + assert_no_filesystem_logged(); + log_state_instance.reset(); + LogManager::DisableLogForComponent(kBlockReader); + + // FileHandle + LogManager::EnableLogForComponent(kFileHandle); + log_all_components_at_level(kError); + assert_filehandle_logged(); + assert_error_logged(); + assert_no_unknown_logged(); + assert_no_rpc_logged(); + assert_no_blockreader_logged(); + assert_no_filesystem_logged(); + log_state_instance.reset(); + LogManager::DisableLogForComponent(kFileHandle); + + // FileSystem + LogManager::EnableLogForComponent(kFileSystem); + log_all_components_at_level(kError); + assert_filesystem_logged(); + assert_error_logged(); + assert_no_unknown_logged(); + assert_no_rpc_logged(); + assert_no_blockreader_logged(); + assert_no_filehandle_logged(); + log_state_instance.reset(); + LogManager::DisableLogForComponent(kFileSystem); +} + +TEST(LoggingTest, Levels) { + // should be safe to focus on one component if MaskOne passes + LogManager::EnableLogForComponent(kUnknown); + LogManager::SetLogLevel(kError); + + LOG_TRACE(kUnknown, << "a"); + LOG_DEBUG(kUnknown, << "b"); + LOG_INFO(kUnknown,<< "c"); + LOG_WARN(kUnknown, << "d"); + assert_nothing_logged(); + LOG_ERROR(kUnknown, << "e"); + assert_error_logged(); + assert_unknown_logged(); + log_state_instance.reset(); + + // anything >= warning + LogManager::SetLogLevel(kWarning); + LOG_TRACE(kUnknown, << "a"); + LOG_DEBUG(kUnknown, << "b"); + LOG_INFO(kUnknown, << "c"); + assert_nothing_logged(); + LOG_WARN(kUnknown, << "d"); + assert_warning_logged(); + LOG_ERROR(kUnknown, << "e"); + assert_error_logged(); + log_state_instance.reset(); + + // anything >= info + LogManager::SetLogLevel(kInfo); + LOG_TRACE(kUnknown, << "a"); + LOG_DEBUG(kUnknown, << "b"); + assert_nothing_logged(); + LOG_INFO(kUnknown, << "c"); + assert_info_logged(); + LOG_WARN(kUnknown, << "d"); + assert_warning_logged(); + LOG_ERROR(kUnknown, << "e"); + assert_error_logged(); + log_state_instance.reset(); + + // anything >= debug + LogManager::SetLogLevel(kDebug); + LOG_TRACE(kUnknown, << "a"); + assert_nothing_logged(); + LOG_DEBUG(kUnknown, << "b"); + assert_debug_logged(); + assert_no_info_logged(); + assert_no_warning_logged(); + assert_no_error_logged(); + LOG_INFO(kUnknown, << "c"); + assert_info_logged(); + assert_no_warning_logged(); + assert_no_error_logged(); + LOG_WARN(kUnknown, << "d"); + assert_warning_logged(); + assert_no_error_logged(); + LOG_ERROR(kUnknown, << "e"); + assert_error_logged(); + log_state_instance.reset(); + + // anything + LogManager::SetLogLevel(kTrace); + assert_nothing_logged(); + LOG_TRACE(kUnknown, << "a"); + assert_trace_logged(); + log_state_instance.reset(); + LOG_DEBUG(kUnknown, << "b"); + assert_debug_logged(); + log_state_instance.reset(); + LOG_INFO(kUnknown, << "c"); + assert_info_logged(); + log_state_instance.reset(); + LOG_WARN(kUnknown, << "d"); + assert_warning_logged(); + log_state_instance.reset(); + LOG_ERROR(kUnknown, << "e"); + assert_error_logged(); +} + +TEST(LoggingTest, Text) { + LogManager::EnableLogForComponent(kRPC); + + std::string text; + LOG_ERROR(kRPC, << text); + + ASSERT_EQ(text, log_state_instance.msg); +} + + +int main(int argc, char *argv[]) { + CForwardingLogger *logger = new CForwardingLogger(); + logger->SetCallback(process_log_msg); + LogManager::SetLoggerImplementation(std::unique_ptr<LoggerInterface>(logger)); + + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + int res = RUN_ALL_TESTS(); + google::protobuf::ShutdownProtobufLibrary(); + return res; +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc new file mode 100644 index 0000000..1885eea --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mock_connection.h" + +namespace hdfs { + +MockConnectionBase::MockConnectionBase(::asio::io_service *io_service) + : io_service_(io_service) +{} + +MockConnectionBase::~MockConnectionBase() {} + +ProducerResult SharedMockConnection::Produce() { + if (auto shared_prducer = shared_connection_data_.lock()) { + return shared_prducer->Produce(); + } else { + assert(false && "No producer registered"); + return std::make_pair(asio::error_code(), ""); + } +} + +std::weak_ptr<SharedConnectionData> SharedMockConnection::shared_connection_data_; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h new file mode 100644 index 0000000..cd1fc12 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_ +#define LIBHDFSPP_TEST_MOCK_CONNECTION_H_ + +#include "common/async_stream.h" + +#include <asio/error_code.hpp> +#include <asio/buffer.hpp> +#include <asio/streambuf.hpp> +#include <asio/io_service.hpp> + +#include <gmock/gmock.h> + +namespace hdfs { + +typedef std::pair<asio::error_code, std::string> ProducerResult; +class AsioProducer { +public: + /* + * Return either: + * (::asio::error_code(), <some data>) for a good result + * (<an ::asio::error instance>, <anything>) to pass an error to the caller + * (::asio::error::would_block, <anything>) to block the next call forever + */ + + virtual ProducerResult Produce() = 0; +}; + + +class MockConnectionBase : public AsioProducer, public AsyncStream { +public: + MockConnectionBase(::asio::io_service *io_service); + virtual ~MockConnectionBase(); + typedef std::pair<asio::error_code, std::string> ProducerResult; + + void async_read_some(const MutableBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + if (produced_.size() == 0) { + ProducerResult r = Produce(); + if (r.first == asio::error::would_block) { + return; // No more reads to do + } + if (r.first) { + io_service_->post(std::bind(handler, r.first, 0)); + return; + } + asio::mutable_buffers_1 data = produced_.prepare(r.second.size()); + asio::buffer_copy(data, asio::buffer(r.second)); + produced_.commit(r.second.size()); + } + + size_t len = std::min(asio::buffer_size(buf), produced_.size()); + asio::buffer_copy(buf, produced_.data()); + produced_.consume(len); + io_service_->post(std::bind(handler, asio::error_code(), len)); + } + + void async_write_some(const ConstBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + // CompletionResult res = OnWrite(buf); + io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf))); + } + + template <class Endpoint, class Callback> + void async_connect(const Endpoint &, Callback &&handler) { + io_service_->post([handler]() { handler(::asio::error_code()); }); + } + + virtual void cancel() {} + virtual void close() {} +protected: + virtual ProducerResult Produce() = 0; + ::asio::io_service *io_service_; + +private: + asio::streambuf produced_; +}; + + + + +class SharedConnectionData : public AsioProducer { + public: + bool checkProducerForConnect = false; + + MOCK_METHOD0(Produce, ProducerResult()); +}; + +class SharedMockConnection : public MockConnectionBase { +public: + using MockConnectionBase::MockConnectionBase; + + template <class Endpoint, class Callback> + void async_connect(const Endpoint &, Callback &&handler) { + auto data = shared_connection_data_.lock(); + assert(data); + + if (!data->checkProducerForConnect) { + io_service_->post([handler]() { handler(::asio::error_code()); }); + } else { + ProducerResult result = Produce(); + if (result.first == asio::error::would_block) { + return; // Connect will hang + } else { + io_service_->post([handler, result]() { handler( result.first); }); + } + } + } + + static void SetSharedConnectionData(std::shared_ptr<SharedConnectionData> new_producer) { + shared_connection_data_ = new_producer; // get a weak reference to it + } + +protected: + ProducerResult Produce() override; + + static std::weak_ptr<SharedConnectionData> shared_connection_data_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/node_exclusion_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/node_exclusion_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/node_exclusion_test.cc new file mode 100644 index 0000000..d1212b9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/node_exclusion_test.cc @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fs/filesystem.h" +#include "fs/bad_datanode_tracker.h" + +#include <gmock/gmock.h> + +using ::testing::_; +using ::testing::InvokeArgument; +using ::testing::Return; + +using namespace hdfs; + +/** + * Unit test for the tracker + **/ + +/* make sure nodes can be added */ +TEST(NodeExclusionTest, AddBadNode) { + auto tracker = std::make_shared<BadDataNodeTracker>(); + + ASSERT_FALSE(tracker->IsBadNode("dn1")); + tracker->AddBadNode("dn1"); + ASSERT_TRUE(tracker->IsBadNode("dn1")); + ASSERT_FALSE(tracker->IsBadNode("dn2")); + tracker->AddBadNode("dn2"); + ASSERT_TRUE(tracker->IsBadNode("dn2")); +} + +/* Make sure nodes get removed when time elapses */ +TEST(NodeExclusionTest, RemoveOnTimeout) { + auto tracker = std::make_shared<BadDataNodeTracker>(); + + /* add node and make sure only that node is marked bad */ + std::string bad_dn("this_dn_died"); + tracker->AddBadNode(bad_dn); + ASSERT_TRUE(tracker->IsBadNode(bad_dn)); + ASSERT_FALSE(tracker->IsBadNode("good_dn")); + + tracker->TEST_set_clock_shift(1000000); + + /* node should be removed on lookup after time shift */ + ASSERT_FALSE(tracker->IsBadNode(bad_dn)); +} + +/** + * Unit tests for ExcludeSet + **/ + +TEST(NodeExclusionTest, ExcludeSet) { + /* empty case */ + auto exclude_set = std::make_shared<ExclusionSet>(std::set<std::string>()); + ASSERT_FALSE(exclude_set->IsBadNode("any_node")); + + /* common case */ + exclude_set = + std::make_shared<ExclusionSet>(std::set<std::string>({"dn_1", "dn_3"})); + ASSERT_TRUE(exclude_set->IsBadNode("dn_1")); + ASSERT_FALSE(exclude_set->IsBadNode("dn_2")); + ASSERT_TRUE(exclude_set->IsBadNode("dn_3")); +} + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + int exit_code = RUN_ALL_TESTS(); + google::protobuf::ShutdownProtobufLibrary(); + return exit_code; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc new file mode 100644 index 0000000..80127f3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc @@ -0,0 +1,536 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mock_connection.h" + +#include "datatransfer.pb.h" +#include "common/util.h" +#include "common/cancel_tracker.h" +#include "reader/block_reader.h" +#include "reader/datatransfer.h" +#include "reader/fileinfo.h" + +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <iostream> + +using namespace hdfs; + +using ::hadoop::common::TokenProto; +using ::hadoop::hdfs::BlockOpResponseProto; +using ::hadoop::hdfs::ChecksumProto; +using ::hadoop::hdfs::DataTransferEncryptorMessageProto; +using ::hadoop::hdfs::ExtendedBlockProto; +using ::hadoop::hdfs::PacketHeaderProto; +using ::hadoop::hdfs::ReadOpChecksumInfoProto; +using ::hadoop::hdfs::LocatedBlockProto; +using ::hadoop::hdfs::LocatedBlocksProto; + +using ::asio::buffer; +using ::asio::error_code; +using ::asio::mutable_buffers_1; +using ::testing::_; +using ::testing::InvokeArgument; +using ::testing::Return; +using std::make_pair; +using std::string; + +namespace pb = ::google::protobuf; +namespace pbio = pb::io; + +namespace hdfs { + +class MockDNConnection : public MockConnectionBase, public DataNodeConnection{ +public: + MockDNConnection(::asio::io_service &io_service) + : MockConnectionBase(&io_service), OnRead([](){}) {} + MOCK_METHOD0(Produce, ProducerResult()); + + MOCK_METHOD1(Connect, void(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)>)); + + /* event handler to trigger side effects */ + std::function<void(void)> OnRead; + + void async_read_some(const MutableBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + this->OnRead(); + this->MockConnectionBase::async_read_some(buf, handler); + } + + void async_write_some(const ConstBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + this->MockConnectionBase::async_write_some(buf, handler); + } + + void Cancel() { + /* no-op, declared pure virtual */ + } +}; + +// Mocks AsyncReadPacket and AsyncRequestBlock but not AsyncReadBlock, so we +// can test the logic of AsyncReadBlock +class PartialMockReader : public BlockReaderImpl { +public: + PartialMockReader() : + BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>(), CancelTracker::New()) {}; + + MOCK_METHOD2( + AsyncReadPacket, + void(const asio::mutable_buffers_1 &, + const std::function<void(const Status &, size_t transferred)> &)); + + MOCK_METHOD5(AsyncRequestBlock, + void(const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset, + const std::function<void(Status)> &handler)); +}; + + +} + +static inline string ToDelimitedString(const pb::MessageLite *msg) { + string res; + res.reserve(hdfs::DelimitedPBMessageSize(msg)); + pbio::StringOutputStream os(&res); + pbio::CodedOutputStream out(&os); + out.WriteVarint32(msg->ByteSize()); + msg->SerializeToCodedStream(&out); + return res; +} + +static inline std::pair<error_code, string> Produce(const std::string &s) { + return make_pair(error_code(), s); +} + +static inline std::pair<error_code, string> ProducePacket( + const std::string &data, const std::string &checksum, int offset_in_block, + int seqno, bool last_packet) { + PacketHeaderProto proto; + proto.set_datalen(data.size()); + proto.set_offsetinblock(offset_in_block); + proto.set_seqno(seqno); + proto.set_lastpacketinblock(last_packet); + + char prefix[6]; + *reinterpret_cast<unsigned *>(prefix) = + htonl(data.size() + checksum.size() + sizeof(int32_t)); + *reinterpret_cast<short *>(prefix + sizeof(int32_t)) = + htons(proto.ByteSize()); + std::string payload(prefix, sizeof(prefix)); + payload.reserve(payload.size() + proto.ByteSize() + checksum.size() + + data.size()); + proto.AppendToString(&payload); + payload += checksum; + payload += data; + return std::make_pair(error_code(), std::move(payload)); +} + +TEST(RemoteBlockReaderTest, TestReadSingleTrunk) { + auto file_info = std::make_shared<struct FileInfo>(); + LocatedBlocksProto blocks; + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + + Status stat; + size_t read = 0; + PartialMockReader reader; + EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) + .WillOnce(InvokeArgument<4>(Status::OK())); + EXPECT_CALL(reader, AsyncReadPacket(_, _)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); + + reader.AsyncReadBlock( + GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(sizeof(buf), read); + read = 0; +} + +TEST(RemoteBlockReaderTest, TestReadMultipleTrunk) { + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + Status stat; + size_t read = 0; + + PartialMockReader reader; + EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) + .WillOnce(InvokeArgument<4>(Status::OK())); + + EXPECT_CALL(reader, AsyncReadPacket(_, _)) + .Times(4) + .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); + + reader.AsyncReadBlock( + GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(sizeof(buf), read); + read = 0; +} + +TEST(RemoteBlockReaderTest, TestReadError) { + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + Status stat; + size_t read = 0; + PartialMockReader reader; + EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) + .WillOnce(InvokeArgument<4>(Status::OK())); + + EXPECT_CALL(reader, AsyncReadPacket(_, _)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::Error("error"), 0)); + + reader.AsyncReadBlock( + GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_FALSE(stat.ok()); + ASSERT_EQ(sizeof(buf) / 4 * 3, read); + read = 0; +} + +template <class Stream = MockDNConnection, class Handler> +static std::shared_ptr<BlockReaderImpl> +ReadContent(std::shared_ptr<Stream> conn, const ExtendedBlockProto &block, + uint64_t length, uint64_t offset, const mutable_buffers_1 &buf, + const Handler &handler, CancelHandle cancel_handle = CancelTracker::New()) { + BlockReaderOptions options; + auto reader = std::make_shared<BlockReaderImpl>(options, conn, cancel_handle); + Status result; + reader->AsyncRequestBlock("libhdfs++", &block, length, offset, + [buf, reader, handler](const Status &stat) { + if (!stat.ok()) { + handler(stat, 0); + } else { + reader->AsyncReadPacket(buf, handler); + } + }); + return reader; +} + +TEST(RemoteBlockReaderTest, TestReadWholeBlock) { + static const size_t kChunkSize = 512; + static const string kChunkData(kChunkSize, 'a'); + ::asio::io_service io_service; + auto conn = std::make_shared<MockDNConnection>(io_service); + BlockOpResponseProto block_op_resp; + + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + EXPECT_CALL(*conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); + + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + bool done = false; + std::string data(kChunkSize, 0); + ReadContent(conn, block, kChunkSize, 0, + buffer(const_cast<char *>(data.c_str()), data.size()), + [&data, &io_service, &done](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + done = true; + io_service.stop(); + }); + io_service.run(); + ASSERT_TRUE(done); +} + +/* used for cancelation tests, global to avoid cluttering capture lists */ +CancelHandle packet_canceller; + +TEST(RemoteBlockReaderTest, TestCancelWhileReceiving) { + packet_canceller = CancelTracker::New(); + + static const size_t kChunkSize = 512; + static const string kChunkData(kChunkSize, 'a'); + ::asio::io_service io_service; + auto conn = std::make_shared<MockDNConnection>(io_service); + BlockOpResponseProto block_op_resp; + + /** + * async_read would normally get called 5 times here; once for each + * continuation in the pipeline. Cancel will be triggered on the + * fourth call to catch the pipeline mid-execution. + **/ + int call_count = 0; + int trigger_at_count = 4; + auto cancel_trigger = [&call_count, &trigger_at_count]() { + call_count += 1; + std::cout << "read called " << call_count << " times" << std::endl; + if(call_count == trigger_at_count) + packet_canceller->set_canceled(); + }; + + conn->OnRead = cancel_trigger; + + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + EXPECT_CALL(*conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); + + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + bool done = false; + std::string data(kChunkSize, 0); + ReadContent(conn, block, kChunkSize, 0, + buffer(const_cast<char *>(data.c_str()), data.size()), + [&data, &io_service, &done](const Status &stat, size_t transferred) { + ASSERT_EQ(stat.code(), Status::kOperationCanceled); + ASSERT_EQ(0, transferred); + done = true; + io_service.stop(); + }, packet_canceller); + + io_service.run(); + ASSERT_TRUE(done); +} + +TEST(RemoteBlockReaderTest, TestReadWithinChunk) { + static const size_t kChunkSize = 1024; + static const size_t kLength = kChunkSize / 4 * 3; + static const size_t kOffset = kChunkSize / 4; + static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b'); + + ::asio::io_service io_service; + auto conn = std::make_shared<MockDNConnection>(io_service); + BlockOpResponseProto block_op_resp; + ReadOpChecksumInfoProto *checksum_info = + block_op_resp.mutable_readopchecksuminfo(); + checksum_info->set_chunkoffset(0); + ChecksumProto *checksum = checksum_info->mutable_checksum(); + checksum->set_type(::hadoop::hdfs::ChecksumTypeProto::CHECKSUM_NULL); + checksum->set_bytesperchecksum(512); + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + + EXPECT_CALL(*conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true))); + + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + bool done = false; + + string data(kLength, 0); + ReadContent(conn, block, data.size(), kOffset, + buffer(const_cast<char *>(data.c_str()), data.size()), + [&data, &io_service,&done](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kLength, transferred); + ASSERT_EQ(kChunkData.substr(kOffset, kLength), data); + done = true; + io_service.stop(); + }); + io_service.run(); + ASSERT_TRUE(done); +} + +TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { + static const size_t kChunkSize = 1024; + static const string kChunkData(kChunkSize, 'a'); + + ::asio::io_service io_service; + auto conn = std::make_shared<MockDNConnection>(io_service); + BlockOpResponseProto block_op_resp; + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + + EXPECT_CALL(*conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false))) + .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true))); + + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + string data(kChunkSize, 0); + mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); + BlockReaderOptions options; + auto reader = std::make_shared<BlockReaderImpl>(options, conn, CancelTracker::New()); + Status result; + reader->AsyncRequestBlock( + "libhdfs++", &block, data.size(), 0, + [buf, reader, &data, &io_service](const Status &stat) { + ASSERT_TRUE(stat.ok()); + reader->AsyncReadPacket( + buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + data.clear(); + data.resize(kChunkSize); + transferred = 0; + reader->AsyncReadPacket( + buf, [&data,&io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + io_service.stop(); + }); + }); + }); + io_service.run(); +} + +TEST(RemoteBlockReaderTest, TestReadCancelBetweenPackets) { + packet_canceller = CancelTracker::New(); + + static const size_t kChunkSize = 1024; + static const string kChunkData(kChunkSize, 'a'); + + ::asio::io_service io_service; + auto conn = std::make_shared<MockDNConnection>(io_service); + BlockOpResponseProto block_op_resp; + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + + EXPECT_CALL(*conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false))); + /* the second AsyncReadPacket should never attempt to read */ + + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + string data(kChunkSize, 0); + mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); + BlockReaderOptions options; + auto reader = std::make_shared<BlockReaderImpl>(options, conn, packet_canceller); + Status result; + reader->AsyncRequestBlock( + "libhdfs++", &block, data.size(), 0, + [buf, reader, &data, &io_service](const Status &stat) { + ASSERT_TRUE(stat.ok()); + reader->AsyncReadPacket( + buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + data.clear(); + data.resize(kChunkSize); + transferred = 0; + + /* Cancel the operation.*/ + packet_canceller->set_canceled(); + + reader->AsyncReadPacket( + buf, [&data,&io_service](const Status &stat, size_t transferred) { + ASSERT_EQ(stat.code(), Status::kOperationCanceled); + ASSERT_EQ(0, transferred); + io_service.stop(); + }); + }); + }); + io_service.run(); +} + + +TEST(RemoteBlockReaderTest, TestSaslConnection) { + static const size_t kChunkSize = 512; + static const string kChunkData(kChunkSize, 'a'); + static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/" + "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\"," + "charset=utf-8,algorithm=md5-sess"; + ::asio::io_service io_service; + auto conn = std::make_shared<MockDNConnection>(io_service); + BlockOpResponseProto block_op_resp; + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + + DataTransferEncryptorMessageProto sasl_resp0, sasl_resp1; + sasl_resp0.set_status( + ::hadoop::hdfs:: + DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); + sasl_resp0.set_payload(kAuthPayload); + sasl_resp1.set_status( + ::hadoop::hdfs:: + DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); + + EXPECT_CALL(*conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0)))) + .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1)))) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); + + auto sasl_conn = std::make_shared<DataTransferSaslStream<MockDNConnection> >(conn, "foo", "bar"); + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + std::string data(kChunkSize, 0); + sasl_conn->Handshake([sasl_conn, &block, &data, &io_service]( + const Status &s) { + ASSERT_TRUE(s.ok()); + ReadContent(sasl_conn, block, kChunkSize, 0, + buffer(const_cast<char *>(data.c_str()), data.size()), + [&data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + io_service.stop(); + }); + }); + io_service.run(); +} + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + int exit_code = RUN_ALL_TESTS(); + + // Clean up static data and prevent valgrind memory leaks + google::protobuf::ShutdownProtobufLibrary(); + return exit_code; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/retry_policy_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/retry_policy_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/retry_policy_test.cc new file mode 100644 index 0000000..03d1b97 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/retry_policy_test.cc @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common/retry_policy.h" + +#include <gmock/gmock.h> + +using namespace hdfs; + +TEST(RetryPolicyTest, TestNoRetry) { + NoRetryPolicy policy; + EXPECT_EQ(RetryAction::FAIL, policy.ShouldRetry(Status::Unimplemented(), 0, 0, true).action); +} + +TEST(RetryPolicyTest, TestFixedDelay) { + static const uint64_t DELAY = 100; + FixedDelayRetryPolicy policy(DELAY, 10); + + // No error + RetryAction result = policy.ShouldRetry(Status::Unimplemented(), 0, 0, true); + EXPECT_EQ(RetryAction::RETRY, result.action); + EXPECT_EQ(DELAY, result.delayMillis); + + // Few errors + result = policy.ShouldRetry(Status::Unimplemented(), 2, 2, true); + EXPECT_EQ(RetryAction::RETRY, result.action); + EXPECT_EQ(DELAY, result.delayMillis); + + result = policy.ShouldRetry(Status::Unimplemented(), 9, 0, true); + EXPECT_EQ(RetryAction::RETRY, result.action); + EXPECT_EQ(DELAY, result.delayMillis); + + // Too many errors + result = policy.ShouldRetry(Status::Unimplemented(), 10, 0, true); + EXPECT_EQ(RetryAction::FAIL, result.action); + EXPECT_TRUE(result.reason.size() > 0); // some error message + + result = policy.ShouldRetry(Status::Unimplemented(), 0, 10, true); + EXPECT_EQ(RetryAction::FAIL, result.action); + EXPECT_TRUE(result.reason.size() > 0); // some error message +} + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc new file mode 100644 index 0000000..f998c7f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -0,0 +1,505 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mock_connection.h" +#include "test.pb.h" +#include "RpcHeader.pb.h" +#include "rpc/rpc_connection_impl.h" +#include "common/namenode_info.h" + +#include <google/protobuf/io/coded_stream.h> + +#include <gmock/gmock.h> + +using ::hadoop::common::RpcResponseHeaderProto; +using ::hadoop::common::EmptyRequestProto; +using ::hadoop::common::EmptyResponseProto; +using ::hadoop::common::EchoRequestProto; +using ::hadoop::common::EchoResponseProto; + +using ::asio::error_code; + +using ::testing::Return; + +using ::std::make_pair; +using ::std::string; + +namespace pb = ::google::protobuf; +namespace pbio = ::google::protobuf::io; + +namespace hdfs { + +std::vector<ResolvedNamenodeInfo> make_endpoint() { + ResolvedNamenodeInfo result; + result.endpoints.push_back(asio::ip::basic_endpoint<asio::ip::tcp>()); + return std::vector<ResolvedNamenodeInfo>({result}); +} + +class MockRPCConnection : public MockConnectionBase { + public: + MockRPCConnection(::asio::io_service &io_service) + : MockConnectionBase(&io_service) {} + MOCK_METHOD0(Produce, ProducerResult()); +}; + +class SharedMockRPCConnection : public SharedMockConnection { + public: + SharedMockRPCConnection(::asio::io_service &io_service) + : SharedMockConnection(&io_service) {} +}; + +class SharedConnectionEngine : public RpcEngine { + using RpcEngine::RpcEngine; + +protected: + std::shared_ptr<RpcConnection> NewConnection() override { + // Stuff in some dummy endpoints so we don't error out + last_endpoints_ = make_endpoint()[0].endpoints; + + return std::make_shared<RpcConnectionImpl<SharedMockRPCConnection>>(shared_from_this()); + } + +}; + +} + +static inline std::pair<error_code, string> RpcResponse( + const RpcResponseHeaderProto &h, const std::string &data, + const ::asio::error_code &ec = error_code()) { + uint32_t payload_length = + pbio::CodedOutputStream::VarintSize32(h.ByteSize()) + + pbio::CodedOutputStream::VarintSize32(data.size()) + h.ByteSize() + + data.size(); + + std::string res; + res.resize(sizeof(uint32_t) + payload_length); + uint8_t *buf = reinterpret_cast<uint8_t *>(const_cast<char *>(res.c_str())); + + buf = pbio::CodedOutputStream::WriteLittleEndian32ToArray( + htonl(payload_length), buf); + buf = pbio::CodedOutputStream::WriteVarint32ToArray(h.ByteSize(), buf); + buf = h.SerializeWithCachedSizesToArray(buf); + buf = pbio::CodedOutputStream::WriteVarint32ToArray(data.size(), buf); + buf = pbio::CodedOutputStream::WriteStringToArray(data, buf); + + return std::make_pair(ec, std::move(res)); +} + + +using namespace hdfs; + +TEST(RpcEngineTest, TestRoundTrip) { + ::asio::io_service io_service; + Options options; + std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, options, "foo", "", "protocol", 1); + auto conn = + std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); + conn->TEST_set_connected(true); + conn->StartReading(); + + EchoResponseProto server_resp; + server_resp.set_message("foo"); + + RpcResponseHeaderProto h; + h.set_callid(1); + h.set_status(RpcResponseHeaderProto::SUCCESS); + EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce()) + .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString()))); + + std::shared_ptr<RpcConnection> conn_ptr(conn); + engine->TEST_SetRpcConnection(conn_ptr); + + bool complete = false; + + EchoRequestProto req; + req.set_message("foo"); + std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); + engine->AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ("foo", resp->message()); + complete = true; + io_service.stop(); + }); + io_service.run(); + ASSERT_TRUE(complete); +} + +TEST(RpcEngineTest, TestConnectionResetAndFail) { + ::asio::io_service io_service; + Options options; + std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, options, "foo", "", "protocol", 1); + auto conn = + std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); + conn->TEST_set_connected(true); + conn->StartReading(); + + bool complete = false; + + RpcResponseHeaderProto h; + h.set_callid(1); + h.set_status(RpcResponseHeaderProto::SUCCESS); + EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce()) + .WillOnce(Return(RpcResponse( + h, "", make_error_code(::asio::error::connection_reset)))); + + std::shared_ptr<RpcConnection> conn_ptr(conn); + engine->TEST_SetRpcConnection(conn_ptr); + + EchoRequestProto req; + req.set_message("foo"); + std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); + + engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { + complete = true; + io_service.stop(); + ASSERT_FALSE(stat.ok()); + }); + io_service.run(); + ASSERT_TRUE(complete); +} + + +TEST(RpcEngineTest, TestConnectionResetAndRecover) { + ::asio::io_service io_service; + Options options; + options.max_rpc_retries = 1; + options.rpc_retry_delay_ms = 0; + std::shared_ptr<SharedConnectionEngine> engine + = std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + + // Normally determined during RpcEngine::Connect, but in this case options + // provides enough info to determine policy here. + engine->TEST_SetRetryPolicy(engine->TEST_GenerateRetryPolicyUsingOptions()); + + + EchoResponseProto server_resp; + server_resp.set_message("foo"); + + bool complete = false; + + auto producer = std::make_shared<SharedConnectionData>(); + RpcResponseHeaderProto h; + h.set_callid(1); + h.set_status(RpcResponseHeaderProto::SUCCESS); + EXPECT_CALL(*producer, Produce()) + .WillOnce(Return(RpcResponse( + h, "", make_error_code(::asio::error::connection_reset)))) + .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString()))); + SharedMockConnection::SetSharedConnectionData(producer); + + EchoRequestProto req; + req.set_message("foo"); + std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); + + engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { + complete = true; + io_service.stop(); + ASSERT_TRUE(stat.ok()); + }); + io_service.run(); + ASSERT_TRUE(complete); +} + +TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) { + ::asio::io_service io_service; + Options options; + options.max_rpc_retries = 1; + options.rpc_retry_delay_ms = 1; + std::shared_ptr<SharedConnectionEngine> engine = + std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + + // Normally determined during RpcEngine::Connect, but in this case options + // provides enough info to determine policy here. + engine->TEST_SetRetryPolicy(engine->TEST_GenerateRetryPolicyUsingOptions()); + + EchoResponseProto server_resp; + server_resp.set_message("foo"); + + bool complete = false; + + auto producer = std::make_shared<SharedConnectionData>(); + RpcResponseHeaderProto h; + h.set_callid(1); + h.set_status(RpcResponseHeaderProto::SUCCESS); + EXPECT_CALL(*producer, Produce()) + .WillOnce(Return(RpcResponse( + h, "", make_error_code(::asio::error::connection_reset)))) + .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString()))); + SharedMockConnection::SetSharedConnectionData(producer); + + EchoRequestProto req; + req.set_message("foo"); + std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); + + engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { + complete = true; + io_service.stop(); + ASSERT_TRUE(stat.ok()); + }); + + ::asio::deadline_timer timer(io_service); + timer.expires_from_now(std::chrono::hours(100)); + timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); }); + + io_service.run(); + ASSERT_TRUE(complete); +} + +TEST(RpcEngineTest, TestConnectionFailure) +{ + auto producer = std::make_shared<SharedConnectionData>(); + producer->checkProducerForConnect = true; + SharedMockConnection::SetSharedConnectionData(producer); + + // Error and no retry + ::asio::io_service io_service; + + bool complete = false; + + Options options; + options.max_rpc_retries = 0; + options.rpc_retry_delay_ms = 0; + std::shared_ptr<SharedConnectionEngine> engine + = std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + EXPECT_CALL(*producer, Produce()) + .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); + + engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { + complete = true; + io_service.stop(); + ASSERT_FALSE(stat.ok()); + }); + io_service.run(); + ASSERT_TRUE(complete); +} + +TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure) +{ + auto producer = std::make_shared<SharedConnectionData>(); + producer->checkProducerForConnect = true; + SharedMockConnection::SetSharedConnectionData(producer); + + ::asio::io_service io_service; + + bool complete = false; + + Options options; + options.max_rpc_retries = 2; + options.rpc_retry_delay_ms = 0; + std::shared_ptr<SharedConnectionEngine> engine = + std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + EXPECT_CALL(*producer, Produce()) + .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) + .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) + .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); + + engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { + complete = true; + io_service.stop(); + ASSERT_FALSE(stat.ok()); + }); + io_service.run(); + ASSERT_TRUE(complete); +} + +TEST(RpcEngineTest, TestConnectionFailureAndRecover) +{ + auto producer = std::make_shared<SharedConnectionData>(); + producer->checkProducerForConnect = true; + SharedMockConnection::SetSharedConnectionData(producer); + + ::asio::io_service io_service; + + bool complete = false; + + Options options; + options.max_rpc_retries = 1; + options.rpc_retry_delay_ms = 0; + std::shared_ptr<SharedConnectionEngine> engine = + std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + EXPECT_CALL(*producer, Produce()) + .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) + .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) + .WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); + + engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { + complete = true; + io_service.stop(); + ASSERT_TRUE(stat.ok()); + }); + io_service.run(); + ASSERT_TRUE(complete); +} + +TEST(RpcEngineTest, TestEventCallbacks) +{ + ::asio::io_service io_service; + Options options; + options.max_rpc_retries = 99; + options.rpc_retry_delay_ms = 0; + std::shared_ptr<SharedConnectionEngine> engine = + std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + + // Normally determined during RpcEngine::Connect, but in this case options + // provides enough info to determine policy here. + engine->TEST_SetRetryPolicy(engine->TEST_GenerateRetryPolicyUsingOptions()); + + // Set up event callbacks + int calls = 0; + std::vector<std::string> callbacks; + engine->SetFsEventCallback([&calls, &callbacks] (const char * event, + const char * cluster, + int64_t value) { + (void)cluster; (void)value; + callbacks.push_back(event); + + // Allow connect and fail first read + calls++; + if (calls == 1 || calls == 3) // First connect and first read + return event_response::test_err(Status::Error("Test")); + + return event_response::make_ok(); + }); + + + + EchoResponseProto server_resp; + server_resp.set_message("foo"); + + auto producer = std::make_shared<SharedConnectionData>(); + producer->checkProducerForConnect = true; + RpcResponseHeaderProto h; + h.set_callid(1); + h.set_status(RpcResponseHeaderProto::SUCCESS); + EXPECT_CALL(*producer, Produce()) + .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) // subverted by callback + .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) + .WillOnce(Return(RpcResponse(h, "b"))) // subverted by callback + .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString()))); + SharedMockConnection::SetSharedConnectionData(producer); + + + EchoRequestProto req; + req.set_message("foo"); + std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); + + bool complete = false; + engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { + complete = true; + io_service.stop(); + ASSERT_TRUE(stat.ok()); + }); + + // If you're adding event hooks you'll most likely need to update this. + // It's a brittle test but makes it hard to miss control flow changes in RPC retry. + for(const auto& m : callbacks) + std::cerr << m << std::endl; + io_service.run(); + ASSERT_TRUE(complete); + ASSERT_EQ(9, callbacks.size()); + ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error + ASSERT_EQ(FS_NN_PRE_RPC_RETRY_EVENT, callbacks[1]); // figure out retry decision + ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[2]); // reconnect + ASSERT_EQ(FS_NN_PRE_RPC_RETRY_EVENT, callbacks[3]); // makes an error + ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[4]); // reconnect + for (int i=5; i < 8; i++) + ASSERT_EQ(FS_NN_READ_EVENT, callbacks[i]); +} + + + +TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover) +{ + // Error and async recover + auto producer = std::make_shared<SharedConnectionData>(); + producer->checkProducerForConnect = true; + SharedMockConnection::SetSharedConnectionData(producer); + + ::asio::io_service io_service; + + bool complete = false; + + Options options; + options.max_rpc_retries = 1; + options.rpc_retry_delay_ms = 1; + std::shared_ptr<SharedConnectionEngine> engine = + std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + EXPECT_CALL(*producer, Produce()) + .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) + .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) + .WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); + + engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { + complete = true; + io_service.stop(); + ASSERT_TRUE(stat.ok()); + }); + + ::asio::deadline_timer timer(io_service); + timer.expires_from_now(std::chrono::hours(100)); + timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); }); + + io_service.run(); + ASSERT_TRUE(complete); +} + +TEST(RpcEngineTest, TestTimeout) { + ::asio::io_service io_service; + Options options; + options.rpc_timeout = 1; + std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, options, "foo", "", "protocol", 1); + auto conn = + std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); + conn->TEST_set_connected(true); + conn->StartReading(); + + EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce()) + .WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); + + std::shared_ptr<RpcConnection> conn_ptr(conn); + engine->TEST_SetRpcConnection(conn_ptr); + + bool complete = false; + + EchoRequestProto req; + req.set_message("foo"); + std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); + engine->AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) { + complete = true; + io_service.stop(); + ASSERT_FALSE(stat.ok()); + }); + + ::asio::deadline_timer timer(io_service); + timer.expires_from_now(std::chrono::hours(100)); + timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); }); + + io_service.run(); + ASSERT_TRUE(complete); +} + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + int exit_code = RUN_ALL_TESTS(); + + // Clean up static data and prevent valgrind memory leaks + google::protobuf::ShutdownProtobufLibrary(); + return exit_code; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc new file mode 100644 index 0000000..553ffa4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "common/sasl_authenticator.h" + +#include <gtest/gtest.h> +#include <google/protobuf/stubs/common.h> + +namespace hdfs { + +/** + * Testing whether the authenticator generates the MD5 digest correctly. + **/ +TEST(DigestMD5AuthenticatorTest, TestResponse) { + const std::string username = "igFLnEx4OIx5PZWHAAAABGhtYWkAAAAoQlAtMTM3MDQ2OTk" + "zLTE5Mi4xNjguMS4yMjctMTQyNDIyMDM4MTM2M4xAAAABAQ" + "RSRUFE"; + const std::string password = "K5IFUibAynVVrApeCXLrBk9Sro8="; + DigestMD5Authenticator auth(username, password, true); + auth.cnonce_ = "KQlJwBDTseCHpAkFLZls4WcAktp6r5wTzje5feLY"; + std::string result; + Status status = + auth.EvaluateResponse("realm=\"0\",nonce=\"+GAWc+O6yEAWpew/" + "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\",charset=" + "utf-8,algorithm=md5-sess", + &result); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(result.find("response=3a286c2c385b92a06ebc66d58b8c4330") != + std::string::npos); + + google::protobuf::ShutdownProtobufLibrary(); +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc new file mode 100644 index 0000000..97f0afd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/uri_test.cc @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfspp/uri.h" +#include <gmock/gmock.h> + +using ::testing::_; + +using namespace hdfs; + +URI expect_uri_throw(const char *uri) { + bool threw = false; + std::string what_msg; + URI val; + try { + val = URI::parse_from_string(uri); + } catch (const uri_parse_error& e) { + threw = true; + what_msg = e.what(); + } catch (...) { + threw = true; + } + + EXPECT_TRUE(threw); + EXPECT_EQ(what_msg, uri); + return val; +} + +URI expect_uri_nothrow(const char *uri) { + bool threw = false; + std::string what_msg; + URI val; + try { + val = URI::parse_from_string(uri); + } catch (const uri_parse_error& e) { + threw = true; + what_msg = e.what(); + } catch (...) { + threw = true; + } + + EXPECT_FALSE(threw); + EXPECT_EQ(what_msg, ""); + return val; +} + + +TEST(UriTest, TestDegenerateInputs) { + /* Empty input */ + expect_uri_nothrow(""); + + /* Invalid encoding */ + expect_uri_throw("%%"); + + /* Invalid port */ + expect_uri_throw("hdfs://nn:foo/"); + + /* Negative port */ + expect_uri_throw("hdfs://nn:-100/"); + + /* Empty paths */ + expect_uri_nothrow("hdfs://////"); +} + + +TEST(UriTest, TestNominalInputs) { + /* Simple input */ + { + URI uri = expect_uri_nothrow("hdfs:///foo"); + EXPECT_EQ("hdfs", uri.get_scheme()); + EXPECT_EQ("", uri.get_host()); + EXPECT_FALSE(uri.has_port()); + EXPECT_EQ(0, uri.get_port_or_default(0)); + EXPECT_EQ("/foo", uri.get_path()); + EXPECT_EQ("", uri.get_fragment()); + EXPECT_EQ("", uri.get_query()); + } + + /* With authority */ + { + URI uri = expect_uri_nothrow("hdfs://host:100/foo"); + EXPECT_EQ("hdfs", uri.get_scheme()); + EXPECT_EQ("host", uri.get_host()); + EXPECT_TRUE(uri.has_port()); + EXPECT_EQ(100, uri.get_port()); + EXPECT_EQ(100, uri.get_port_or_default(0)); + EXPECT_EQ("/foo", uri.get_path()); + EXPECT_EQ("", uri.get_fragment()); + EXPECT_EQ("", uri.get_query()); + } + + /* No scheme */ + { + URI uri = expect_uri_nothrow("/foo"); + EXPECT_EQ("", uri.get_scheme()); + EXPECT_EQ("", uri.get_host()); + EXPECT_FALSE(uri.has_port()); + EXPECT_EQ(0, uri.get_port_or_default(0)); + EXPECT_EQ("/foo", uri.get_path()); + EXPECT_EQ("", uri.get_fragment()); + EXPECT_EQ("", uri.get_query()); + } + + /* All fields */ + { + URI uri = expect_uri_nothrow("hdfs://nn:8020/path/to/data?a=b&c=d#fragment"); + EXPECT_EQ("hdfs", uri.get_scheme()); + EXPECT_EQ("nn", uri.get_host()); + EXPECT_TRUE(uri.has_port()); + EXPECT_EQ(8020, uri.get_port()); + EXPECT_EQ(8020, uri.get_port_or_default(0)); + EXPECT_EQ("/path/to/data", uri.get_path()); + EXPECT_EQ("a=b&c=d", uri.get_query()); + EXPECT_EQ(3, uri.get_path_elements().size()); + EXPECT_EQ("path", uri.get_path_elements()[0]); + EXPECT_EQ("to", uri.get_path_elements()[1]); + EXPECT_EQ("data", uri.get_path_elements()[2]); + EXPECT_EQ(2, uri.get_query_elements().size()); + EXPECT_EQ("a", uri.get_query_elements()[0].key); + EXPECT_EQ("b", uri.get_query_elements()[0].value); + EXPECT_EQ("c", uri.get_query_elements()[1].key); + EXPECT_EQ("d", uri.get_query_elements()[1].value); + EXPECT_EQ("fragment", uri.get_fragment()); + } +} + +TEST(UriTest, TestEncodedInputs) { + // Note that scheme and port cannot be uri-encoded + + /* Encoded input */ + { + URI uri = expect_uri_nothrow("S://%5E:1/+%5E%20?%5E=%5E#%5E"); + EXPECT_EQ("S", uri.get_scheme()); + EXPECT_EQ("^", uri.get_host()); + EXPECT_EQ(1, uri.get_port_or_default(0)); + EXPECT_EQ("/ ^ ", uri.get_path()); + EXPECT_EQ("^", uri.get_fragment()); + EXPECT_EQ("^=^", uri.get_query()); + } + + /* Lowercase */ + { + URI uri = expect_uri_nothrow("S://%5e:1/+%5e%20?%5e=%5e#%5e"); + EXPECT_EQ("S", uri.get_scheme()); + EXPECT_EQ("^", uri.get_host()); + EXPECT_EQ(1, uri.get_port_or_default(0)); + EXPECT_EQ("/ ^ ", uri.get_path()); + EXPECT_EQ("^", uri.get_fragment()); + EXPECT_EQ("^=^", uri.get_query()); + } +} + +TEST(UriTest, TestDecodedInputsAndOutputs) { + /* All fields non-encoded and shouldn't be interpreted */ + { + URI uri = expect_uri_nothrow("S://%25/%25+?%25=%25#%25"); + EXPECT_EQ("S", uri.get_scheme()); + EXPECT_EQ("%", uri.get_host()); + EXPECT_EQ(0, uri.get_port_or_default(0)); + EXPECT_EQ("/% ", uri.get_path()); + EXPECT_EQ("%", uri.get_fragment()); + EXPECT_EQ("%=%", uri.get_query()); + } + + /* All fields encode fields on their way out */ + { + URI uri = expect_uri_nothrow("S://%25/%25+?%25=%25#%25"); + EXPECT_EQ("S", uri.get_scheme(true)); + EXPECT_EQ("%25", uri.get_host(true)); + EXPECT_EQ(0, uri.get_port_or_default(0)); + EXPECT_EQ("/%25+", uri.get_path(true)); + EXPECT_EQ("%25", uri.get_fragment(true)); + EXPECT_EQ("%25=%25", uri.get_query(true)); + } + +} + +TEST(UriTest, TestSetters) { + + /* Non-encoded inputs */ + { + URI uri; + uri.set_scheme("S"); + uri.set_host("%"); + uri.set_port(100); + uri.set_path("%/%/%"); + uri.set_fragment("%"); + uri.set_query("%25=%25"); //set_query must always be encoded + EXPECT_EQ("S://%25:100/%25/%25/%25?%25=%25#%25", uri.str()); + } + + /* Incremental adders, non-encoded */ + { + URI uri; + uri.set_scheme("S"); + uri.set_host("%"); + uri.set_port(100); + uri.set_fragment("%"); + EXPECT_EQ("S://%25:100#%25", uri.str()); + + uri.add_path("%"); + uri.add_query("%", "%"); + EXPECT_EQ("S://%25:100/%25?%25=%25#%25", uri.str()); + + uri.add_path("%"); + uri.add_query("%", "%"); + EXPECT_EQ("S://%25:100/%25/%25?%25=%25&%25=%25#%25", uri.str()); + } + + /* Encoded inputs */ + { + URI uri; + uri.set_scheme("S", true); + uri.set_host("%25", true); + uri.set_port(100); + uri.set_path("%25/%25/%25", true); + uri.set_fragment("%25", true); + uri.set_query("%25=%25"); //set_query must always be encoded + EXPECT_EQ("S://%25:100/%25/%25/%25?%25=%25#%25", uri.str()); + } + + /* Incremental adders, encoded */ + { + URI uri; + uri.set_scheme("S", true); + uri.set_host("%25", true); + uri.set_port(100); + uri.set_fragment("%25", true); + EXPECT_EQ("S://%25:100#%25", uri.str()); + + uri.add_path("%25", true); + uri.add_query("%25", "%25", true); + EXPECT_EQ("S://%25:100/%25?%25=%25#%25", uri.str()); + + uri.add_path("%25", true); + uri.add_query("%25", "%25", true); + EXPECT_EQ("S://%25:100/%25/%25?%25=%25&%25=%25#%25", uri.str()); + } + +} + +TEST(UriTest, QueryManip) { + // Not encoded, just basic adding and removing query parts + { + URI uri = URI::parse_from_string("hdfs://nn:8020/path?thedude=lebowski&donny=outofhiselement"); + EXPECT_TRUE(uri.has_port()); + EXPECT_EQ(uri.get_query(), "thedude=lebowski&donny=outofhiselement"); + + std::vector<URI::Query> queries = uri.get_query_elements(); + EXPECT_EQ(queries.size(), 2); + EXPECT_EQ(queries[0].key, "thedude"); + EXPECT_EQ(queries[0].value, "lebowski"); + EXPECT_EQ(queries[1].key, "donny"); + EXPECT_EQ(queries[1].value, "outofhiselement"); + + uri.remove_query("donny"); // that's a bummer, man + EXPECT_EQ(uri.get_query(), "thedude=lebowski"); + queries = uri.get_query_elements(); + EXPECT_EQ(queries.size(), 1); + EXPECT_EQ(queries[0].key, "thedude"); + EXPECT_EQ(queries[0].value, "lebowski"); + + uri.add_query("HeyPeter", "CheckItOut"); + EXPECT_EQ(uri.get_query(), "thedude=lebowski&HeyPeter=CheckItOut"); + queries = uri.get_query_elements(); + EXPECT_EQ(queries.size(), 2); + } + +} + +int main(int argc, char *argv[]) { + /* + * The following line must be executed to initialize Google Mock + * (and Google Test) before running the tests. + */ + ::testing::InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc new file mode 100644 index 0000000..6df47b2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/user_lock_test.cc @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hdfspp/locks.h> + +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <iostream> +#include <memory> +#include <mutex> +#include <thread> +#include <vector> + +using namespace hdfs; + +// try_lock will always return false, unlock will always throw because it +// can never be locked. +class CantLockMutex : public Mutex { + public: + void lock() override { + throw LockFailure("This mutex cannot be locked"); + } + void unlock() override { + throw LockFailure("Unlock"); + } + std::string str() override { + return "CantLockMutex"; + } +}; + +TEST(UserLockTest, DefaultMutexBasics) { + Mutex *mtx = LockManager::TEST_get_default_mutex(); + + // lock and unlock twice to make sure unlock works + bool locked = false; + try { + mtx->lock(); + locked = true; + } catch (...) {} + EXPECT_TRUE(locked); + mtx->unlock(); + + locked = false; + try { + mtx->lock(); + locked = true; + } catch (...) {} + EXPECT_TRUE(locked); + mtx->unlock(); + + EXPECT_EQ(mtx->str(), "DefaultMutex"); +} + + +// Make sure lock manager can only be initialized once unless test reset called +TEST(UserLockTest, LockManager) { + std::unique_ptr<CantLockMutex> mtx(new CantLockMutex()); + EXPECT_TRUE(mtx != nullptr); + + // Check the default lock + Mutex *defaultGssapiMtx = LockManager::getGssapiMutex(); + EXPECT_TRUE(defaultGssapiMtx != nullptr); + + // Try a double init. Should not work + bool res = LockManager::InitLocks(mtx.get()); + EXPECT_TRUE(res); + + // Check pointer value + EXPECT_EQ(LockManager::getGssapiMutex(), mtx.get()); + + res = LockManager::InitLocks(mtx.get()); + EXPECT_FALSE(res); + + // Make sure test reset still works + LockManager::TEST_reset_manager(); + res = LockManager::InitLocks(mtx.get()); + EXPECT_TRUE(res); + LockManager::TEST_reset_manager(); + EXPECT_EQ(LockManager::getGssapiMutex(), defaultGssapiMtx); +} + +TEST(UserLockTest, CheckCantLockMutex) { + std::unique_ptr<CantLockMutex> mtx(new CantLockMutex()); + EXPECT_TRUE(mtx != nullptr); + + bool locked = false; + try { + mtx->lock(); + } catch (...) {} + EXPECT_FALSE(locked); + + bool threw_on_unlock = false; + try { + mtx->unlock(); + } catch (const LockFailure& e) { + threw_on_unlock = true; + } + EXPECT_TRUE(threw_on_unlock); + + EXPECT_EQ("CantLockMutex", mtx->str()); +} + +TEST(UserLockTest, LockGuardBasics) { + Mutex *goodMtx = LockManager::TEST_get_default_mutex(); + CantLockMutex badMtx; + + // lock/unlock a few times to increase chances of UB if lock is misused + for(int i=0;i<10;i++) { + bool caught_exception = false; + try { + LockGuard guard(goodMtx); + // now have a scoped lock + } catch (const LockFailure& e) { + caught_exception = true; + } + EXPECT_FALSE(caught_exception); + } + + // still do a few times, but expect it to blow up each time + for(int i=0;i<10;i++) { + bool caught_exception = false; + try { + LockGuard guard(&badMtx); + // now have a scoped lock + } catch (const LockFailure& e) { + caught_exception = true; + } + EXPECT_TRUE(caught_exception); + } + +} + +struct Incrementer { + int64_t& _val; + int64_t _iters; + Mutex *_mtx; + Incrementer(int64_t &val, int64_t iters, Mutex *m) + : _val(val), _iters(iters), _mtx(m) {} + void operator()(){ + for(int64_t i=0; i<_iters; i++) { + LockGuard valguard(_mtx); + _val += 1; + } + } +}; + +struct Decrementer { + int64_t& _val; + int64_t _iters; + Mutex *_mtx; + Decrementer(int64_t &val, int64_t iters, Mutex *m) + : _val(val), _iters(iters), _mtx(m) {} + void operator()(){ + for(int64_t i=0; i<_iters; i++) { + LockGuard valguard(_mtx); + _val -= 1; + } + } +}; + +TEST(UserLockTest, LockGuardConcurrency) { + Mutex *mtx = LockManager::TEST_get_default_mutex(); + + // Prove that these actually mutate the value + int64_t test_value = 0; + Incrementer inc(test_value, 1000, mtx); + inc(); + EXPECT_EQ(test_value, 1000); + + Decrementer dec(test_value, 1000, mtx); + dec(); + EXPECT_EQ(test_value, 0); + + std::vector<std::thread> workers; + std::vector<Incrementer> incrementers; + std::vector<Decrementer> decrementors; + + const int delta = 1024 * 1024; + const int threads = 2 * 6; + EXPECT_EQ(threads % 2, 0); + + // a bunch of threads race to increment and decrement the value + // if all goes well the operations balance out and the value is unchanged + for(int i=0; i < threads; i++) { + if(i%2 == 0) { + incrementers.emplace_back(test_value, delta, mtx); + workers.emplace_back(incrementers.back()); + } else { + decrementors.emplace_back(test_value, delta, mtx); + workers.emplace_back(decrementors.back()); + } + } + + // join, everything should balance to 0 + for(std::thread& thread : workers) { + thread.join(); + } + EXPECT_EQ(test_value, 0); +} + + +int main(int argc, char *argv[]) { + + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + int res = RUN_ALL_TESTS(); + return res; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/third_party/asio-1.10.2/COPYING ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/third_party/asio-1.10.2/COPYING b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/third_party/asio-1.10.2/COPYING new file mode 100644 index 0000000..e86a381 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/third_party/asio-1.10.2/COPYING @@ -0,0 +1,4 @@ +Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com) + +Distributed under the Boost Software License, Version 1.0. (See accompanying +file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) http://git-wip-us.apache.org/repos/asf/hadoop/blob/b78c94f4/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio.hpp ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio.hpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio.hpp new file mode 100644 index 0000000..1f47840 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio.hpp @@ -0,0 +1,122 @@ +// +// asio.hpp +// ~~~~~~~~ +// +// Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef ASIO_HPP +#define ASIO_HPP + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +# pragma once +#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) + +#include "asio/async_result.hpp" +#include "asio/basic_datagram_socket.hpp" +#include "asio/basic_deadline_timer.hpp" +#include "asio/basic_io_object.hpp" +#include "asio/basic_raw_socket.hpp" +#include "asio/basic_seq_packet_socket.hpp" +#include "asio/basic_serial_port.hpp" +#include "asio/basic_signal_set.hpp" +#include "asio/basic_socket_acceptor.hpp" +#include "asio/basic_socket_iostream.hpp" +#include "asio/basic_socket_streambuf.hpp" +#include "asio/basic_stream_socket.hpp" +#include "asio/basic_streambuf.hpp" +#include "asio/basic_waitable_timer.hpp" +#include "asio/buffer.hpp" +#include "asio/buffered_read_stream_fwd.hpp" +#include "asio/buffered_read_stream.hpp" +#include "asio/buffered_stream_fwd.hpp" +#include "asio/buffered_stream.hpp" +#include "asio/buffered_write_stream_fwd.hpp" +#include "asio/buffered_write_stream.hpp" +#include "asio/buffers_iterator.hpp" +#include "asio/completion_condition.hpp" +#include "asio/connect.hpp" +#include "asio/coroutine.hpp" +#include "asio/datagram_socket_service.hpp" +#include "asio/deadline_timer_service.hpp" +#include "asio/deadline_timer.hpp" +#include "asio/error.hpp" +#include "asio/error_code.hpp" +#include "asio/generic/basic_endpoint.hpp" +#include "asio/generic/datagram_protocol.hpp" +#include "asio/generic/raw_protocol.hpp" +#include "asio/generic/seq_packet_protocol.hpp" +#include "asio/generic/stream_protocol.hpp" +#include "asio/handler_alloc_hook.hpp" +#include "asio/handler_continuation_hook.hpp" +#include "asio/handler_invoke_hook.hpp" +#include "asio/handler_type.hpp" +#include "asio/io_service.hpp" +#include "asio/ip/address.hpp" +#include "asio/ip/address_v4.hpp" +#include "asio/ip/address_v6.hpp" +#include "asio/ip/basic_endpoint.hpp" +#include "asio/ip/basic_resolver.hpp" +#include "asio/ip/basic_resolver_entry.hpp" +#include "asio/ip/basic_resolver_iterator.hpp" +#include "asio/ip/basic_resolver_query.hpp" +#include "asio/ip/host_name.hpp" +#include "asio/ip/icmp.hpp" +#include "asio/ip/multicast.hpp" +#include "asio/ip/resolver_query_base.hpp" +#include "asio/ip/resolver_service.hpp" +#include "asio/ip/tcp.hpp" +#include "asio/ip/udp.hpp" +#include "asio/ip/unicast.hpp" +#include "asio/ip/v6_only.hpp" +#include "asio/is_read_buffered.hpp" +#include "asio/is_write_buffered.hpp" +#include "asio/local/basic_endpoint.hpp" +#include "asio/local/connect_pair.hpp" +#include "asio/local/datagram_protocol.hpp" +#include "asio/local/stream_protocol.hpp" +#include "asio/placeholders.hpp" +#include "asio/posix/basic_descriptor.hpp" +#include "asio/posix/basic_stream_descriptor.hpp" +#include "asio/posix/descriptor_base.hpp" +#include "asio/posix/stream_descriptor.hpp" +#include "asio/posix/stream_descriptor_service.hpp" +#include "asio/raw_socket_service.hpp" +#include "asio/read.hpp" +#include "asio/read_at.hpp" +#include "asio/read_until.hpp" +#include "asio/seq_packet_socket_service.hpp" +#include "asio/serial_port.hpp" +#include "asio/serial_port_base.hpp" +#include "asio/serial_port_service.hpp" +#include "asio/signal_set.hpp" +#include "asio/signal_set_service.hpp" +#include "asio/socket_acceptor_service.hpp" +#include "asio/socket_base.hpp" +#include "asio/strand.hpp" +#include "asio/stream_socket_service.hpp" +#include "asio/streambuf.hpp" +#include "asio/system_error.hpp" +#include "asio/thread.hpp" +#include "asio/time_traits.hpp" +#include "asio/version.hpp" +#include "asio/wait_traits.hpp" +#include "asio/waitable_timer_service.hpp" +#include "asio/windows/basic_handle.hpp" +#include "asio/windows/basic_object_handle.hpp" +#include "asio/windows/basic_random_access_handle.hpp" +#include "asio/windows/basic_stream_handle.hpp" +#include "asio/windows/object_handle.hpp" +#include "asio/windows/object_handle_service.hpp" +#include "asio/windows/overlapped_ptr.hpp" +#include "asio/windows/random_access_handle.hpp" +#include "asio/windows/random_access_handle_service.hpp" +#include "asio/windows/stream_handle.hpp" +#include "asio/windows/stream_handle_service.hpp" +#include "asio/write.hpp" +#include "asio/write_at.hpp" + +#endif // ASIO_HPP --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org