Added BlockLocator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3789da72 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3789da72 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3789da72 Branch: refs/heads/decimal-type Commit: 3789da728e95b91bd97587f5e34da6ff1b55ea5f Parents: 4503198 Author: Zuyu Zhang <zzh...@pivotal.io> Authored: Sat May 28 14:24:13 2016 -0700 Committer: Zuyu Zhang <zzh...@pivotal.io> Committed: Wed Jun 8 11:57:46 2016 -0700 ---------------------------------------------------------------------- query_execution/BlockLocator.cpp | 223 +++++++++++++++ query_execution/BlockLocator.hpp | 125 +++++++++ query_execution/CMakeLists.txt | 48 ++++ query_execution/QueryExecutionMessages.proto | 34 +++ query_execution/QueryExecutionTypedefs.hpp | 16 ++ query_execution/tests/BlockLocator_unittest.cpp | 270 +++++++++++++++++++ storage/CMakeLists.txt | 9 +- storage/StorageManager.cpp | 190 ++++++++++++- storage/StorageManager.hpp | 76 +++++- 9 files changed, 984 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/BlockLocator.cpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp new file mode 100644 index 0000000..6cf5249 --- /dev/null +++ b/query_execution/BlockLocator.cpp @@ -0,0 +1,223 @@ +/** + * Copyright 2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/BlockLocator.hpp" + +#include <cstdlib> +#include <string> +#include <utility> + +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "threading/ThreadUtil.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +using std::free; +using std::malloc; +using std::move; + +using tmb::TaggedMessage; +using tmb::client_id; + +namespace quickstep { + +void BlockLocator::run() { + if (cpu_id_ >= 0) { + ThreadUtil::BindToCPU(cpu_id_); + } + + for (;;) { + // Receive() is a blocking call, causing this thread to sleep until next + // message is received. + const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + const client_id sender = annotated_message.sender; + LOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type() + << "' message from TMB Client " << sender; + switch (tagged_message.message_type()) { + case kBlockDomainRegistrationMessage: { + serialization::BlockDomainRegistrationMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processBlockDomainRegistrationMessage(sender, proto.domain_network_address()); + break; + } + case kAddBlockLocationMessage: { + serialization::BlockLocationMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + const block_id block = proto.block_id(); + const block_id_domain domain = proto.block_domain(); + + const auto result_block_locations = block_locations_[block].insert(domain); + const auto result_domain_blocks = domain_blocks_[domain].insert(block); + DCHECK_EQ(result_block_locations.second, result_domain_blocks.second); + + if (result_domain_blocks.second) { + LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain; + } else { + LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain; + } + break; + } + case kDeleteBlockLocationMessage: { + serialization::BlockLocationMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + const block_id block = proto.block_id(); + const block_id_domain domain = proto.block_domain(); + + const auto cit = block_locations_[block].find(domain); + if (cit != block_locations_[block].end()) { + block_locations_[block].erase(domain); + domain_blocks_[domain].erase(block); + + LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain; + } else { + LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain; + } + break; + } + case kLocateBlockMessage: { + serialization::BlockMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processLocateBlockMessage(sender, proto.block_id()); + break; + } + case kGetPeerDomainNetworkAddressesMessage: { + serialization::BlockMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processGetPeerDomainNetworkAddressesMessage(sender, proto.block_id()); + break; + } + case kBlockDomainUnregistrationMessage: { + serialization::BlockDomainMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + const block_id_domain domain = proto.block_domain(); + + domain_network_addresses_.erase(domain); + + for (const block_id block : domain_blocks_[domain]) { + block_locations_[block].erase(domain); + } + domain_blocks_.erase(domain); + + LOG(INFO) << "Unregistered Domain " << domain; + break; + } + case kPoisonMessage: { + return; + } + } + } +} + +void BlockLocator::processBlockDomainRegistrationMessage(const client_id receiver, + const std::string &network_address) { + DCHECK_LT(block_domain_, kMaxDomain); + + domain_network_addresses_.emplace(++block_domain_, network_address); + domain_blocks_[block_domain_]; + + serialization::BlockDomainMessage proto; + proto.set_block_domain(block_domain_); + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kBlockDomainRegistrationResponseMessage); + free(proto_bytes); + + LOG(INFO) << "BlockLocator (id '" << locator_client_id_ + << "') sent BlockDomainRegistrationResponseMessage (typed '" + << kBlockDomainRegistrationResponseMessage + << "') to Worker (id '" << receiver << "')"; + CHECK(tmb::MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(bus_, + locator_client_id_, + receiver, + move(message))); +} + +void BlockLocator::processLocateBlockMessage(const client_id receiver, + const block_id block) { + serialization::LocateBlockResponseMessage proto; + + for (const block_id_domain domain : block_locations_[block]) { + proto.add_block_domains(domain); + } + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kLocateBlockResponseMessage); + free(proto_bytes); + + LOG(INFO) << "BlockLocator (id '" << locator_client_id_ + << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage + << "') to StorageManager (id '" << receiver << "')"; + CHECK(tmb::MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(bus_, + locator_client_id_, + receiver, + move(message))); +} + +void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver, + const block_id block) { + serialization::GetPeerDomainNetworkAddressesResponseMessage proto; + + for (const block_id_domain domain : block_locations_[block]) { + proto.add_domain_network_addresses(domain_network_addresses_[domain]); + } + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kGetPeerDomainNetworkAddressesResponseMessage); + free(proto_bytes); + + LOG(INFO) << "BlockLocator (id '" << locator_client_id_ + << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '" + << kGetPeerDomainNetworkAddressesResponseMessage + << "') to StorageManager (id '" << receiver << "')"; + CHECK(tmb::MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(bus_, + locator_client_id_, + receiver, + move(message))); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/BlockLocator.hpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp new file mode 100644 index 0000000..bbd9b8f --- /dev/null +++ b/query_execution/BlockLocator.hpp @@ -0,0 +1,125 @@ +/** + * Copyright 2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_ + +#include <atomic> +#include <string> +#include <unordered_map> +#include <unordered_set> + +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageConstants.hpp" +#include "threading/Thread.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" + +namespace quickstep { + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief A class for keeping trace of blocks loaded in a Worker's buffer pool + * in the distributed version. + **/ +class BlockLocator : public Thread { + public: + /** + * @brief Constructor. + * + * @param bus A pointer to the TMB. + * @param cpu_id The ID of the CPU to which the BlockLocator thread can be pinned. + * + * @note If cpu_id is not specified, BlockLocator thread can be possibly moved + * around on different CPUs by the OS. + **/ + BlockLocator(tmb::MessageBus *bus, + const int cpu_id = -1) + : bus_(DCHECK_NOTNULL(bus)), + cpu_id_(cpu_id), + block_domain_(0) { + locator_client_id_ = bus_->Connect(); + + bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage); + bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage); + + bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage); + bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage); + + bus_->RegisterClientAsReceiver(locator_client_id_, kLocateBlockMessage); + bus_->RegisterClientAsSender(locator_client_id_, kLocateBlockResponseMessage); + + bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage); + bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage); + + bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage); + bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage); + } + + ~BlockLocator() override {} + + /** + * @brief Get the TMB client ID of BlockLocator thread. + * + * @return TMB client ID of BlockLocator thread. + **/ + tmb::client_id getBusClientID() const { + return locator_client_id_; + } + + protected: + void run() override; + + private: + void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address); + void processLocateBlockMessage(const tmb::client_id receiver, const block_id block); + void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block); + + tmb::MessageBus *bus_; + + // The ID of the CPU that the BlockLocator thread can optionally be pinned to. + const int cpu_id_; + + alignas(kCacheLineBytes) std::atomic<block_id_domain> block_domain_; + + // From a block domain to its network info in the ip:port format, i.e., + // "0.0.0.0:0". + std::unordered_map<block_id_domain, const std::string> domain_network_addresses_; + + // From a block to its domains. + std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_; + + // From a block domain to all blocks loaded in its buffer pool. + std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_; + + tmb::client_id locator_client_id_; + + DISALLOW_COPY_AND_ASSIGN(BlockLocator); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 04a0348..7d9d601 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -20,6 +20,9 @@ QS_PROTOBUF_GENERATE_CPP(queryexecution_QueryExecutionMessages_proto_srcs QueryExecutionMessages.proto) # Declare micro-libs: +if (ENABLE_DISTRIBUTED) + add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp) +endif() add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp) add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp) add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp) @@ -40,6 +43,19 @@ add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessag add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp) # Link dependencies: +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryexecution_BlockLocator + glog + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_threading_Thread + quickstep_threading_ThreadUtil + quickstep_utility_Macros + tmb) +endif() target_link_libraries(quickstep_queryexecution_Foreman glog gtest @@ -176,7 +192,37 @@ target_link_libraries(quickstep_queryexecution quickstep_queryexecution_WorkerDirectory quickstep_queryexecution_WorkerMessage quickstep_queryexecution_WorkerSelectionPolicy) +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryexecution + quickstep_queryexecution_BlockLocator) +endif() + # Tests: +if (ENABLE_DISTRIBUTED) + add_executable(BlockLocator_unittest + "${CMAKE_CURRENT_SOURCE_DIR}/tests/BlockLocator_unittest.cpp") + target_link_libraries(BlockLocator_unittest + gflags_nothreads-static + glog + gtest + quickstep_catalog_CatalogAttribute + quickstep_catalog_CatalogRelation + quickstep_queryexecution_BlockLocator + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_storage_StorageBlob + quickstep_storage_StorageBlock + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_StorageManager + quickstep_types_TypeFactory + quickstep_types_TypeID + tmb + ${LIBS}) + add_test(BlockLocator_unittest BlockLocator_unittest) +endif() + add_executable(Foreman_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/Foreman_unittest.cpp") target_link_libraries(Foreman_unittest @@ -269,3 +315,5 @@ target_link_libraries(WorkerSelectionPolicy_unittest quickstep_queryexecution_WorkerDirectory quickstep_queryexecution_WorkerSelectionPolicy) add_test(WorkerSelectionPolicy_unittest WorkerSelectionPolicy_unittest) + +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/block_locator_test_data/) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 8d2efd0..15803cf 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -16,6 +16,10 @@ syntax = "proto2"; package quickstep.serialization; +// Used for any messages that do not carry payloads. +message EmptyMessage { +} + // Used for both Normal WorkOrders and RebuildWorkOrders. // NOTE(zuyu): we might need to seperate the completion messages to contain // run-time information for Foreman to make better decisions on scheduling @@ -42,3 +46,33 @@ message DataPipelineMessage { message WorkOrdersAvailableMessage { required uint64 operator_index = 1; } + +// BlockLocator related messages. +message BlockDomainRegistrationMessage { + // Format IP:Port, i.e., "0.0.0.0:0". + required string domain_network_address = 1; +} + +// Used for RegistrationResponse, Unregistration, and FailureReport. +message BlockDomainMessage { + required uint32 block_domain = 1; +} + +// Used when StorageManager loads or evicts a block or a blob from its buffer +// pool. +message BlockLocationMessage { + required fixed64 block_id = 1; + required uint32 block_domain = 2; +} + +message BlockMessage { + required fixed64 block_id = 1; +} + +message LocateBlockResponseMessage { + repeated uint32 block_domains = 1; +} + +message GetPeerDomainNetworkAddressesResponseMessage { + repeated string domain_network_addresses = 1; +} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index 36cfd82..fc253bc 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -18,6 +18,7 @@ #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_ #define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_ +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED #include "threading/ThreadIDBasedMap.hpp" #include "tmb/address.h" @@ -55,6 +56,7 @@ using ClientIDMap = ThreadIDBasedMap<client_id, 'a', 'p'>; +// We sort the following message types in the order of a life cycle of a query. enum QueryExecutionMessageType : message_type_id { kWorkOrderMessage, // From Foreman to Worker. kWorkOrderCompleteMessage, // From Worker to Foreman. @@ -66,6 +68,20 @@ enum QueryExecutionMessageType : message_type_id { kRebuildWorkOrderMessage, // From Foreman to Worker. kRebuildWorkOrderCompleteMessage, // From Worker to Foreman. kPoisonMessage, // From the CLI shell to Foreman, then from Foreman to Workers. + +#ifdef QUICKSTEP_DISTRIBUTED + // BlockLocator related messages, sorted in a life cycle of StorageManager + // with a unique block domain. + kBlockDomainRegistrationMessage, // From Worker to BlockLocator. + kBlockDomainRegistrationResponseMessage, // From BlockLocator to Worker. + kAddBlockLocationMessage, // From StorageManager to BlockLocator. + kDeleteBlockLocationMessage, // From StorageManager to BlockLocator. + kLocateBlockMessage, // From StorageManager to BlockLocator. + kLocateBlockResponseMessage, // From BlockLocator to StorageManager. + kGetPeerDomainNetworkAddressesMessage, // From StorageManager to BlockLocator. + kGetPeerDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager. + kBlockDomainUnregistrationMessage, // From StorageManager to BlockLocator. +#endif }; /** @} */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/query_execution/tests/BlockLocator_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp new file mode 100644 index 0000000..fe7b86b --- /dev/null +++ b/query_execution/tests/BlockLocator_unittest.cpp @@ -0,0 +1,270 @@ +/** + * Copyright 2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include <cstdlib> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "catalog/CatalogAttribute.hpp" +#include "catalog/CatalogRelation.hpp" +#include "query_execution/BlockLocator.hpp" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "storage/StorageBlob.hpp" +#include "storage/StorageBlock.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageConstants.hpp" +#include "storage/StorageManager.hpp" +#include "types/TypeFactory.hpp" +#include "types/TypeID.hpp" + +#include "gflags/gflags.h" +#include "glog/logging.h" +#include "gtest/gtest.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +using std::free; +using std::malloc; +using std::move; +using std::string; +using std::unique_ptr; +using std::vector; + +using tmb::AnnotatedMessage; +using tmb::MessageBus; +using tmb::TaggedMessage; + +namespace quickstep { + +class BlockLocatorTest : public ::testing::Test { + protected: + static const char kStoragePath[]; + static const char kDomainNetworkAddress[]; + + ~BlockLocatorTest() { + locator_->join(); + } + + virtual void SetUp() { + bus_.Initialize(); + + locator_.reset(new BlockLocator(&bus_)); + locator_client_id_ = locator_->getBusClientID(); + locator_->start(); + + worker_client_id_ = bus_.Connect(); + + bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage); + bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage); + + bus_.RegisterClientAsSender(worker_client_id_, kLocateBlockMessage); + bus_.RegisterClientAsReceiver(worker_client_id_, kLocateBlockResponseMessage); + + bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage); + + block_domain_ = getBlockDomain(kDomainNetworkAddress); + + storage_manager_.reset( + new StorageManager(kStoragePath, block_domain_, locator_client_id_, &bus_)); + } + + virtual void TearDown() { + storage_manager_.reset(); + + serialization::EmptyMessage proto; + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kPoisonMessage); + free(proto_bytes); + + LOG(INFO) << "Worker (id '" << worker_client_id_ + << "') sent PoisonMessage (typed '" << kPoisonMessage + << "') to BlockLocator (id '" << locator_client_id_ << "')"; + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, + worker_client_id_, + locator_client_id_, + move(message))); + } + + vector<block_id_domain> getPeerDomains(const block_id block) { + serialization::BlockMessage proto; + proto.set_block_id(block); + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kLocateBlockMessage); + free(proto_bytes); + + LOG(INFO) << "Worker (id '" << worker_client_id_ + << "') sent LocateBlockMessage (typed '" << kLocateBlockMessage + << "') to BlockLocator"; + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, + worker_client_id_, + locator_client_id_, + move(message))); + + const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true)); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + CHECK_EQ(kLocateBlockResponseMessage, tagged_message.message_type()); + LOG(INFO) << "Worker (id '" << worker_client_id_ + << "') received LocateBlockResponseMessage from BlockLocator"; + + serialization::LocateBlockResponseMessage response_proto; + CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + vector<block_id_domain> domains; + for (int i = 0; i < response_proto.block_domains_size(); ++i) { + domains.push_back(response_proto.block_domains(i)); + } + + return domains; + } + + void checkLoaded(const block_id block) { + const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block); + EXPECT_EQ(1u, peer_domain_network_addresses.size()); + EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_addresses[0].data()); + + const vector<block_id_domain> domains = getPeerDomains(block); + EXPECT_EQ(1u, domains.size()); + EXPECT_EQ(block_domain_, domains[0]); + } + + void checkEvicted(const block_id block) { + const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block); + EXPECT_TRUE(peer_domain_network_addresses.empty()); + + const vector<block_id_domain> domains = getPeerDomains(block); + EXPECT_TRUE(domains.empty()); + } + + tmb::client_id worker_client_id_; + + block_id_domain block_domain_; + unique_ptr<StorageManager> storage_manager_; + + private: + block_id_domain getBlockDomain(const string &network_address) { + serialization::BlockDomainRegistrationMessage proto; + proto.set_domain_network_address(network_address); + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kBlockDomainRegistrationMessage); + free(proto_bytes); + + LOG(INFO) << "Worker (id '" << worker_client_id_ + << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage + << "') to BlockLocator (id '" << locator_client_id_ << "')"; + + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, + worker_client_id_, + locator_client_id_, + move(message))); + + const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true)); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + EXPECT_EQ(locator_client_id_, annotated_message.sender); + EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type()); + LOG(INFO) << "Worker (id '" << worker_client_id_ + << "') received BlockDomainRegistrationResponseMessage from BlockLocator"; + + serialization::BlockDomainMessage response_proto; + CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + return static_cast<block_id_domain>(response_proto.block_domain()); + } + + MessageBusImpl bus_; + + unique_ptr<BlockLocator> locator_; + tmb::client_id locator_client_id_; +}; + +const char BlockLocatorTest::kStoragePath[] = "./block_locator_test_data/"; +const char BlockLocatorTest::kDomainNetworkAddress[] = "ip:port"; + +TEST_F(BlockLocatorTest, BlockTest) { + CatalogRelation relation(nullptr, "rel"); + relation.addAttribute(new CatalogAttribute(nullptr, "attr_int", TypeFactory::GetType(kInt))); + + const block_id block = + storage_manager_->createBlock(relation, relation.getDefaultStorageBlockLayout()); + checkLoaded(block); + + ASSERT_TRUE(storage_manager_->saveBlockOrBlob(block)); + storage_manager_->evictBlockOrBlob(block); + checkEvicted(block); + + { + const BlockReference block_ref = storage_manager_->getBlock(block, relation); + } + checkLoaded(block); + + storage_manager_->deleteBlockOrBlobFile(block); + checkEvicted(block); +} + +TEST_F(BlockLocatorTest, BlobTest) { + const block_id blob = storage_manager_->createBlob(kDefaultBlockSizeInSlots); + checkLoaded(blob); + + ASSERT_TRUE(storage_manager_->saveBlockOrBlob(blob)); + storage_manager_->evictBlockOrBlob(blob); + checkEvicted(blob); + + { + const BlobReference blob_ref = storage_manager_->getBlob(blob); + } + checkLoaded(blob); + + storage_manager_->deleteBlockOrBlobFile(blob); + checkEvicted(blob); +} + +} // namespace quickstep + +int main(int argc, char **argv) { + google::InitGoogleLogging(argv[0]); + // Honor FLAGS_buffer_pool_slots in StorageManager. + gflags::ParseCommandLineFlags(&argc, &argv, true); + + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 87a5e54..4da16ea 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -941,7 +941,8 @@ target_link_libraries(quickstep_storage_StorageManager quickstep_utility_Alignment quickstep_utility_CalculateInstalledMemory quickstep_utility_Macros - quickstep_utility_ShardedLockManager) + quickstep_utility_ShardedLockManager + tmb) if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS) target_link_libraries(quickstep_storage_StorageManager quickstep_storage_FileManagerHdfs) @@ -950,6 +951,12 @@ if (QUICKSTEP_HAVE_LIBNUMA) target_link_libraries(quickstep_storage_StorageManager ${LIBNUMA_LIBRARY}) endif() +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_storage_StorageManager + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil) +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_storage_SubBlockTypeRegistry glog quickstep_storage_StorageBlockLayout_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/storage/StorageManager.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp index 5d91052..15e2503 100644 --- a/storage/StorageManager.cpp +++ b/storage/StorageManager.cpp @@ -18,6 +18,7 @@ // This is included before other files so that we can conditionally determine // what else to include. #include "catalog/CatalogConfig.h" +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED #include "storage/StorageConfig.h" // Define feature test macros to enable large page support for mmap. @@ -52,6 +53,12 @@ #include <unordered_map> #include <vector> +#ifdef QUICKSTEP_DISTRIBUTED +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#endif + #include "storage/CountedReference.hpp" #include "storage/EvictionPolicy.hpp" #include "storage/FileManagerLocal.hpp" @@ -74,6 +81,13 @@ #include "gflags/gflags.h" #include "glog/logging.h" +#include "tmb/id_typedefs.h" + +#ifdef QUICKSTEP_DISTRIBUTED +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" +#endif + using std::free; using std::int32_t; using std::memset; @@ -81,6 +95,15 @@ using std::size_t; using std::string; using std::vector; +#ifdef QUICKSTEP_DISTRIBUTED +using std::malloc; +using std::move; +using std::unique_ptr; + +using tmb::MessageBus; +using tmb::TaggedMessage; +#endif + namespace quickstep { static bool ValidateBlockDomain(const char *flagname, @@ -157,14 +180,21 @@ DEFINE_bool(use_hdfs, false, "Use HDFS as the persistent storage, instead of the #endif StorageManager::StorageManager( - const std::string &storage_path, - const block_id_domain block_domain, - const size_t max_memory_usage, - EvictionPolicy *eviction_policy) + const std::string &storage_path, + const block_id_domain block_domain, + const size_t max_memory_usage, + EvictionPolicy *eviction_policy, + const tmb::client_id block_locator_client_id, + tmb::MessageBus *bus) : storage_path_(storage_path), total_memory_usage_(0), max_memory_usage_(max_memory_usage), - eviction_policy_(eviction_policy) { + eviction_policy_(eviction_policy), +#ifdef QUICKSTEP_DISTRIBUTED + block_domain_(block_domain), +#endif + block_locator_client_id_(block_locator_client_id), + bus_(bus) { #ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS if (FLAGS_use_hdfs) { file_manager_.reset(new FileManagerHdfs(storage_path)); @@ -175,10 +205,55 @@ StorageManager::StorageManager( file_manager_.reset(new FileManagerLocal(storage_path)); #endif +#ifdef QUICKSTEP_DISTRIBUTED + // NOTE(zuyu): The following if-condition is a workaround to bypass code for + // the distributed version in some unittests that does not use TMB. The + // end-to-end functional tests for the distributed version, however, would not + // be affected. + if (bus_) { + storage_manager_client_id_ = bus_->Connect(); + + bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage); + bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage); + + bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage); + bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage); + bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage); + + LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_ + << "') starts with Domain " << block_domain; + } +#endif + block_index_ = BlockIdUtil::GetBlockId(block_domain, file_manager_->getMaxUsedBlockCounter(block_domain)); } StorageManager::~StorageManager() { +#ifdef QUICKSTEP_DISTRIBUTED + if (bus_) { + serialization::BlockDomainMessage proto; + proto.set_block_domain(block_domain_); + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kBlockDomainUnregistrationMessage); + free(proto_bytes); + + LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_ + << "') sent BlockDomainUnregistrationMessage (typed '" << kBlockDomainUnregistrationMessage + << "') to BlockLocator"; + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(bus_, + storage_manager_client_id_, + block_locator_client_id_, + move(message))); + } +#endif + for (std::unordered_map<block_id, BlockHandle>::iterator it = blocks_.begin(); it != blocks_.end(); ++it) { @@ -222,6 +297,12 @@ block_id StorageManager::createBlock(const CatalogRelationSchema &relation, // Make '*eviction_policy_' aware of the new block's existence. eviction_policy_->blockCreated(new_block_id); +#ifdef QUICKSTEP_DISTRIBUTED + if (bus_) { + sendBlockLocationMessage(new_block_id, kAddBlockLocationMessage); + } +#endif + return new_block_id; } @@ -249,6 +330,12 @@ block_id StorageManager::createBlob(const std::size_t num_slots, // Make '*eviction_policy_' aware of the new blob's existence. eviction_policy_->blockCreated(new_block_id); +#ifdef QUICKSTEP_DISTRIBUTED + if (bus_) { + sendBlockLocationMessage(new_block_id, kAddBlockLocationMessage); + } +#endif + return new_block_id; } @@ -315,6 +402,12 @@ bool StorageManager::saveBlockOrBlob(const block_id block, const bool force) { } void StorageManager::evictBlockOrBlob(const block_id block) { +#ifdef QUICKSTEP_DISTRIBUTED + if (bus_) { + sendBlockLocationMessage(block, kDeleteBlockLocationMessage); + } +#endif + BlockHandle handle; { SpinSharedMutexExclusiveLock<false> write_lock(blocks_shared_mutex_); @@ -362,6 +455,87 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots, return ++block_index_; } +#ifdef QUICKSTEP_DISTRIBUTED +vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) { + serialization::BlockMessage proto; + proto.set_block_id(block); + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kGetPeerDomainNetworkAddressesMessage); + free(proto_bytes); + + LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_ + << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage + << "') to BlockLocator"; + + DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone); + DCHECK(bus_ != nullptr); + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(bus_, + storage_manager_client_id_, + block_locator_client_id_, + move(message))); + + const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true)); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + CHECK_EQ(block_locator_client_id_, annotated_message.sender); + CHECK_EQ(kGetPeerDomainNetworkAddressesResponseMessage, tagged_message.message_type()); + LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_ + << "') received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator"; + + serialization::GetPeerDomainNetworkAddressesResponseMessage response_proto; + CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + vector<string> domain_network_addresses; + for (int i = 0; i < response_proto.domain_network_addresses_size(); ++i) { + domain_network_addresses.push_back(response_proto.domain_network_addresses(i)); + } + + return domain_network_addresses; +} + +void StorageManager::sendBlockLocationMessage(const block_id block, + const tmb::message_type_id message_type) { + switch (message_type) { + case kAddBlockLocationMessage: + LOG(INFO) << "Loaded Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_; + break; + case kDeleteBlockLocationMessage: + LOG(INFO) << "Evicted Block " << BlockIdUtil::ToString(block) << " in Domain " << block_domain_; + break; + default: + LOG(FATAL) << "Unknown message type " << message_type; + } + + serialization::BlockLocationMessage proto; + proto.set_block_id(block); + proto.set_block_domain(block_domain_); + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + message_type); + free(proto_bytes); + + LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_ + << "') sent BlockLocationMessage (typed '" << message_type + << "') to BlockLocator"; + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(bus_, + storage_manager_client_id_, + block_locator_client_id_, + move(message))); +} +#endif + StorageManager::BlockHandle StorageManager::loadBlockOrBlob( const block_id block, const int numa_node) { // The caller of this function holds an exclusive lock on this block/blob's @@ -378,6 +552,12 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob( loaded_handle.block_memory = block_buffer; loaded_handle.block_memory_size = num_slots; +#ifdef QUICKSTEP_DISTRIBUTED + if (bus_) { + sendBlockLocationMessage(block, kAddBlockLocationMessage); + } +#endif + return loaded_handle; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3789da72/storage/StorageManager.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp index 52326c2..55a011e 100644 --- a/storage/StorageManager.hpp +++ b/storage/StorageManager.hpp @@ -20,11 +20,14 @@ #include <atomic> #include <chrono> +#include <cstddef> #include <memory> #include <string> #include <unordered_map> #include <vector> +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED + #include "storage/CountedReference.hpp" #include "storage/EvictionPolicy.hpp" #include "storage/FileManager.hpp" @@ -40,6 +43,10 @@ #include "gflags/gflags.h" #include "gtest/gtest_prod.h" +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + namespace quickstep { DECLARE_int32(block_domain); @@ -50,6 +57,7 @@ DECLARE_bool(use_hdfs); #endif class CatalogRelationSchema; + class StorageBlockLayout; /** \addtogroup Storage @@ -104,6 +112,33 @@ class StorageManager { std::chrono::milliseconds(200))) { } +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Constructor. + * @param storage_path The filesystem directory where blocks have persistent + * storage. + * @param block_domain The unique block domain. + * @param block_locator_client_id The TMB client ID of the block locator. + * @param bus A pointer to the TMB. + * + * @exception CorruptPersistentStorage The storage directory layout is not + * in the expected format. + **/ + StorageManager(const std::string &storage_path, + const block_id_domain block_domain, + const tmb::client_id block_locator_client_id, + tmb::MessageBus *bus) + : StorageManager(storage_path, + block_domain, + FLAGS_buffer_pool_slots, + LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy( + 2, + std::chrono::milliseconds(200)), + block_locator_client_id, + bus) { + } +#endif + /** * @brief Constructor. * @param storage_path The filesystem directory where blocks have persistent @@ -121,13 +156,18 @@ class StorageManager { * @param eviction_policy The eviction policy that the storage manager should * use to manage the cache. The storage manager takes * ownership of *eviction_policy. + * @param block_locator_client_id The TMB client ID of the block locator. + * @param bus A pointer to the TMB. + * * @exception CorruptPersistentStorage The storage directory layout is not * in the expected format. **/ StorageManager(const std::string &storage_path, const block_id_domain block_domain, const size_t max_memory_usage, - EvictionPolicy *eviction_policy); + EvictionPolicy *eviction_policy, + const tmb::client_id block_locator_client_id = tmb::kClientIdNone, + tmb::MessageBus *bus = nullptr); /** * @brief Destructor which also destroys all managed blocks. @@ -332,6 +372,27 @@ class StorageManager { StorageBlockBase *block; }; +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Get the network info of all the remote StorageManagers which may + * load the given block in the buffer pool. + * + * @param block The block or blob to pull. + * + * @return The network info of all the possible peers to pull. + **/ + std::vector<std::string> getPeerDomainNetworkAddresses(const block_id block); + + /** + * @brief Update the block location info in BlockLocator. + * + * @param block The given block or blob. + * @param message_type Indicate whether to add or delete a block location. + **/ + void sendBlockLocationMessage(const block_id block, + const tmb::message_type_id message_type); +#endif + // Helper for createBlock() and createBlob(). Allocates a block ID and memory // slots for a new StorageBlock or StorageBlob. Returns the allocated ID and // writes the allocated slot range into 'handle->slot_index_low' and @@ -459,6 +520,15 @@ class StorageManager { std::unique_ptr<EvictionPolicy> eviction_policy_; +#ifdef QUICKSTEP_DISTRIBUTED + const block_id_domain block_domain_; + + tmb::client_id storage_manager_client_id_; +#endif + + const tmb::client_id block_locator_client_id_; + tmb::MessageBus *bus_; + std::unique_ptr<FileManager> file_manager_; // Used to generate unique IDs in allocateNewBlockOrBlob(). @@ -486,6 +556,10 @@ class StorageManager { static constexpr std::size_t kLockManagerNumShards = 0x2000-1; ShardedLockManager<block_id, kLockManagerNumShards, SpinSharedMutex<false>> lock_manager_; + friend class BlockLocatorTest; + FRIEND_TEST(BlockLocatorTest, BlockTest); + FRIEND_TEST(BlockLocatorTest, BlobTest); + FRIEND_TEST(StorageManagerTest, DifferentNUMANodeBlobTestWithEviction); FRIEND_TEST(StorageManagerTest, EvictFromSameShardTest);