Used two TMB implementations in Shiftboss. - Global TMB between Foreman and Shiftboss. - Local TMB between Workers and Shiftboss.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b88625d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b88625d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b88625d8 Branch: refs/heads/reorder-partitioned-hash-join Commit: b88625d80dbd7641e159a7c6bf959021fa2cac86 Parents: c9d1f22 Author: Zuyu Zhang <zu...@apache.org> Authored: Wed Feb 8 12:48:31 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Fri Feb 24 11:15:41 2017 -0800 ---------------------------------------------------------------------- cli/distributed/Executor.cpp | 7 +- cli/distributed/Executor.hpp | 4 + query_execution/Shiftboss.cpp | 419 +++++++++++-------- query_execution/Shiftboss.hpp | 92 +--- .../DistributedExecutionGeneratorTestRunner.cpp | 8 +- .../DistributedExecutionGeneratorTestRunner.hpp | 1 + 6 files changed, 279 insertions(+), 252 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/cli/distributed/Executor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp index 3485298..e248fef 100644 --- a/cli/distributed/Executor.cpp +++ b/cli/distributed/Executor.cpp @@ -35,6 +35,7 @@ #include "tmb/id_typedefs.h" #include "tmb/native_net_client_message_bus.h" +#include "tmb/pure_memory_message_bus.h" #include "glog/logging.h" @@ -47,6 +48,8 @@ using tmb::client_id; namespace quickstep { void Executor::init() { + bus_local_.Initialize(); + executor_client_id_ = bus_.Connect(); DLOG(INFO) << "Executor TMB Client ID: " << executor_client_id_; @@ -59,7 +62,7 @@ void Executor::init() { for (std::size_t worker_thread_index = 0; worker_thread_index < FLAGS_num_workers; ++worker_thread_index) { - workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_)); + workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_local_)); worker_client_ids.push_back(workers_.back()->getBusClientID()); } @@ -76,7 +79,7 @@ void Executor::init() { data_exchanger_.start(); shiftboss_ = - make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs()); + make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs()); shiftboss_->start(); for (const auto &worker : workers_) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/cli/distributed/Executor.hpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Executor.hpp b/cli/distributed/Executor.hpp index 6ffa756..aafeeae 100644 --- a/cli/distributed/Executor.hpp +++ b/cli/distributed/Executor.hpp @@ -24,6 +24,7 @@ #include <vector> #include "cli/distributed/Role.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/Shiftboss.hpp" #include "query_execution/Worker.hpp" #include "query_execution/WorkerDirectory.hpp" @@ -65,6 +66,9 @@ class Executor final : public Role { void run() override {} private: + // Used between Shiftboss and Workers. + MessageBusImpl bus_local_; + tmb::client_id executor_client_id_; std::vector<std::unique_ptr<Worker>> workers_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index bae5205..2f7dc3c 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -64,6 +64,91 @@ namespace quickstep { class WorkOrder; +Shiftboss::Shiftboss(tmb::MessageBus *bus_global, + tmb::MessageBus *bus_local, + StorageManager *storage_manager, + WorkerDirectory *workers, + void *hdfs, + const int cpu_id) + : bus_global_(DCHECK_NOTNULL(bus_global)), + bus_local_(DCHECK_NOTNULL(bus_local)), + storage_manager_(DCHECK_NOTNULL(storage_manager)), + workers_(DCHECK_NOTNULL(workers)), + hdfs_(hdfs), + cpu_id_(cpu_id), + shiftboss_client_id_global_(tmb::kClientIdNone), + shiftboss_client_id_local_(tmb::kClientIdNone), + foreman_client_id_(tmb::kClientIdNone), + max_msgs_per_worker_(1), + start_worker_index_(0u) { + // Check to have at least one Worker. + DCHECK_GT(workers->getNumWorkers(), 0u); + +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + if (FLAGS_use_hdfs) { + CHECK(hdfs_); + } +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + + shiftboss_client_id_global_ = bus_global_->Connect(); + LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_global_; + DCHECK_NE(shiftboss_client_id_global_, tmb::kClientIdNone); + + shiftboss_client_id_local_ = bus_local_->Connect(); + DCHECK_NE(shiftboss_client_id_local_, tmb::kClientIdNone); + + // Messages between Foreman and Shiftboss. + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kShiftbossRegistrationMessage); + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kShiftbossRegistrationResponseMessage); + + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryInitiateMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kQueryInitiateResponseMessage); + + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kInitiateRebuildMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kInitiateRebuildResponseMessage); + + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kSaveQueryResultMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kSaveQueryResultResponseMessage); + + // Message sent to Worker. + bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kShiftbossRegistrationResponseMessage); + bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kRebuildWorkOrderMessage); + + // Forward the following message types from Foreman to Workers. + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kWorkOrderMessage); + bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kWorkOrderMessage); + + // Forward the following message types from Workers to Foreman. + bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kCatalogRelationNewBlockMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kCatalogRelationNewBlockMessage); + + bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kDataPipelineMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kDataPipelineMessage); + + bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderFeedbackMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderFeedbackMessage); + + bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderCompleteMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderCompleteMessage); + + bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kRebuildWorkOrderCompleteMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kRebuildWorkOrderCompleteMessage); + + // Clean up query execution states, i.e., QueryContext. + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryTeardownMessage); + + // Stop itself. + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kPoisonMessage); + // Stop all workers. + bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kPoisonMessage); + + for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) { + worker_addresses_.AddRecipient(workers_->getClientID(i)); + } + + registerWithForeman(); +} + void Shiftboss::run() { if (cpu_id_ >= 0) { // We can pin the shiftboss thread to a CPU if specified. @@ -73,159 +158,161 @@ void Shiftboss::run() { processShiftbossRegistrationResponseMessage(); for (;;) { - // Receive() is a blocking call, causing this thread to sleep until next - // message is received. - AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true)); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') received the typed '" << annotated_message.tagged_message.message_type() - << "' message from client " << annotated_message.sender; - switch (annotated_message.tagged_message.message_type()) { - case kQueryInitiateMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::QueryInitiateMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context()); - break; - } - case kWorkOrderMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::WorkOrderMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - const std::size_t query_id = proto.query_id(); - DCHECK_EQ(1u, query_contexts_.count(query_id)); - - WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(), - shiftboss_index_, - &database_cache_, - query_contexts_[query_id].get(), - storage_manager_, - shiftboss_client_id_, - bus_, - hdfs_); - - unique_ptr<WorkerMessage> worker_message( - WorkerMessage::WorkOrderMessage(work_order, proto.operator_index())); - - TaggedMessage worker_tagged_message(worker_message.get(), - sizeof(*worker_message), - kWorkOrderMessage); - - const size_t worker_index = getSchedulableWorker(); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage - << "') from Foreman to worker " << worker_index; - - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - workers_->getClientID(worker_index), - move(worker_tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK); - break; - } - case kInitiateRebuildMessage: { - // Construct rebuild work orders, and send back their number to - // 'ForemanDistributed'. - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::InitiateRebuildMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processInitiateRebuildMessage(proto.query_id(), - proto.operator_index(), - proto.insert_destination_index(), - proto.relation_id()); - break; - } - case kCatalogRelationNewBlockMessage: // Fall through. - case kDataPipelineMessage: - case kWorkOrderFeedbackMessage: - case kWorkOrderCompleteMessage: - case kRebuildWorkOrderCompleteMessage: { - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') forwarded typed '" << annotated_message.tagged_message.message_type() - << "' message from Worker with TMB client ID '" << annotated_message.sender - << "' to Foreman with TMB client ID " << foreman_client_id_; - - DCHECK_NE(foreman_client_id_, tmb::kClientIdNone); - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(annotated_message.tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK); - break; - } - case kQueryTeardownMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; + AnnotatedMessage annotated_message; + if (bus_global_->ReceiveIfAvailable(shiftboss_client_id_global_, &annotated_message, 0, true)) { + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ + << "') received the typed '" << annotated_message.tagged_message.message_type() + << "' message from Foreman " << annotated_message.sender; + switch (annotated_message.tagged_message.message_type()) { + case kQueryInitiateMessage: { + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::QueryInitiateMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context()); + break; + } + case kWorkOrderMessage: { + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::WorkOrderMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + const std::size_t query_id = proto.query_id(); + DCHECK_EQ(1u, query_contexts_.count(query_id)); + + unique_ptr<WorkOrder> work_order( + WorkOrderFactory::ReconstructFromProto(proto.work_order(), shiftboss_index_, &database_cache_, + query_contexts_[query_id].get(), storage_manager_, + shiftboss_client_id_local_, bus_local_, hdfs_)); + + unique_ptr<WorkerMessage> worker_message( + WorkerMessage::WorkOrderMessage(work_order.release(), proto.operator_index())); + + TaggedMessage worker_tagged_message(worker_message.get(), + sizeof(*worker_message), + kWorkOrderMessage); + + const size_t worker_index = getSchedulableWorker(); + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_ + << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage + << "') from Foreman to worker " << worker_index; + + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_local_, + shiftboss_client_id_local_, + workers_->getClientID(worker_index), + move(worker_tagged_message)); + CHECK(send_status == MessageBus::SendStatus::kOK); + break; + } + case kInitiateRebuildMessage: { + // Construct rebuild work orders, and send back their number to + // 'ForemanDistributed'. + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::InitiateRebuildMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processInitiateRebuildMessage(proto.query_id(), + proto.operator_index(), + proto.insert_destination_index(), + proto.relation_id()); + break; + } + case kQueryTeardownMessage: { + const TaggedMessage &tagged_message = annotated_message.tagged_message; - serialization::QueryTeardownMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + serialization::QueryTeardownMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - query_contexts_.erase(proto.query_id()); - break; + query_contexts_.erase(proto.query_id()); + break; + } + case kSaveQueryResultMessage: { + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::SaveQueryResultMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + for (int i = 0; i < proto.blocks_size(); ++i) { + storage_manager_->saveBlockOrBlob(proto.blocks(i)); + } + + // Clean up query execution states, i.e., QueryContext. + query_contexts_.erase(proto.query_id()); + + serialization::SaveQueryResultResponseMessage proto_response; + proto_response.set_query_id(proto.query_id()); + proto_response.set_relation_id(proto.relation_id()); + proto_response.set_cli_id(proto.cli_id()); + proto_response.set_shiftboss_index(shiftboss_index_); + + const size_t proto_response_length = proto_response.ByteSize(); + char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length)); + CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length)); + + TaggedMessage message_response(static_cast<const void*>(proto_response_bytes), + proto_response_length, + kSaveQueryResultResponseMessage); + free(proto_response_bytes); + + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ + << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage + << "') to Foreman with TMB client ID " << foreman_client_id_; + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_global_, + shiftboss_client_id_global_, + foreman_client_id_, + move(message_response)); + CHECK(send_status == MessageBus::SendStatus::kOK); + break; + } + case kPoisonMessage: { + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ + << "') forwarded PoisonMessage (typed '" << kPoisonMessage + << "') from Foreman to all workers"; + + tmb::MessageStyle broadcast_style; + broadcast_style.Broadcast(true); + + const MessageBus::SendStatus send_status = + bus_local_->Send(shiftboss_client_id_local_, worker_addresses_, broadcast_style, + move(annotated_message.tagged_message)); + CHECK(send_status == MessageBus::SendStatus::kOK); + return; + } + default: { + LOG(FATAL) << "Unknown TMB message type"; + } } - case kSaveQueryResultMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::SaveQueryResultMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + } - for (int i = 0; i < proto.blocks_size(); ++i) { - storage_manager_->saveBlockOrBlob(proto.blocks(i)); + while (bus_local_->ReceiveIfAvailable(shiftboss_client_id_local_, &annotated_message, 0, true)) { + switch (annotated_message.tagged_message.message_type()) { + case kCatalogRelationNewBlockMessage: + case kDataPipelineMessage: + case kWorkOrderFeedbackMessage: + case kWorkOrderCompleteMessage: + case kRebuildWorkOrderCompleteMessage: { + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ + << "') forwarded typed '" << annotated_message.tagged_message.message_type() + << "' message from Worker with TMB client ID '" << annotated_message.sender + << "' to Foreman with TMB client ID " << foreman_client_id_; + + DCHECK_NE(foreman_client_id_, tmb::kClientIdNone); + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_global_, + shiftboss_client_id_global_, + foreman_client_id_, + move(annotated_message.tagged_message)); + CHECK(send_status == MessageBus::SendStatus::kOK); + break; + } + default: { + LOG(FATAL) << "Unknown TMB message type"; } - - // Clean up query execution states, i.e., QueryContext. - query_contexts_.erase(proto.query_id()); - - serialization::SaveQueryResultResponseMessage proto_response; - proto_response.set_query_id(proto.query_id()); - proto_response.set_relation_id(proto.relation_id()); - proto_response.set_cli_id(proto.cli_id()); - proto_response.set_shiftboss_index(shiftboss_index_); - - const size_t proto_response_length = proto_response.ByteSize(); - char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length)); - CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length)); - - TaggedMessage message_response(static_cast<const void*>(proto_response_bytes), - proto_response_length, - kSaveQueryResultResponseMessage); - free(proto_response_bytes); - - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage - << "') to Foreman with TMB client ID " << foreman_client_id_; - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(message_response)); - CHECK(send_status == MessageBus::SendStatus::kOK); - break; - } - case kPoisonMessage: { - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') forwarded PoisonMessage (typed '" << kPoisonMessage - << "') from Foreman to all workers"; - - tmb::MessageStyle broadcast_style; - broadcast_style.Broadcast(true); - - const MessageBus::SendStatus send_status = - bus_->Send(shiftboss_client_id_, - worker_addresses_, - broadcast_style, - move(annotated_message.tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK); - return; - } - default: { - LOG(FATAL) << "Unknown TMB message type"; } } } @@ -265,21 +352,21 @@ void Shiftboss::registerWithForeman() { kShiftbossRegistrationMessage); free(proto_bytes); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage << "') to all"; tmb::MessageBus::SendStatus send_status = - bus_->Send(shiftboss_client_id_, all_addresses, style, move(message)); + bus_global_->Send(shiftboss_client_id_global_, all_addresses, style, move(message)); DCHECK(send_status == tmb::MessageBus::SendStatus::kOK); } void Shiftboss::processShiftbossRegistrationResponseMessage() { - AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true)); + AnnotatedMessage annotated_message(bus_global_->Receive(shiftboss_client_id_global_, 0, true)); const TaggedMessage &tagged_message = annotated_message.tagged_message; DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type()); foreman_client_id_ = annotated_message.sender; - DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_local_ << "') received the typed '" << kShiftbossRegistrationResponseMessage << "' message from ForemanDistributed with client " << foreman_client_id_; @@ -290,10 +377,10 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() { storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_); // Forward this message to Workers regarding <shiftboss_index_>. - QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_, + QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_local_, worker_addresses_, move(annotated_message.tagged_message), - bus_); + bus_local_); } void Shiftboss::processQueryInitiateMessage( @@ -303,7 +390,7 @@ void Shiftboss::processQueryInitiateMessage( database_cache_.update(catalog_database_cache_proto); auto query_context = std::make_unique<QueryContext>( - query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_, bus_); + query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_local_, bus_local_); query_contexts_.emplace(query_id, move(query_context)); serialization::QueryInitiateResponseMessage proto; @@ -318,12 +405,12 @@ void Shiftboss::processQueryInitiateMessage( kQueryInitiateResponseMessage); free(proto_bytes); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage << "') to Foreman with TMB client ID " << foreman_client_id_; const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, + QueryExecutionUtil::SendTMBMessage(bus_global_, + shiftboss_client_id_global_, foreman_client_id_, move(message_response)); CHECK(send_status == MessageBus::SendStatus::kOK); @@ -357,12 +444,12 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, kInitiateRebuildResponseMessage); free(proto_bytes); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage << "') to Foreman with TMB client ID " << foreman_client_id_; const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, + QueryExecutionUtil::SendTMBMessage(bus_global_, + shiftboss_client_id_global_, foreman_client_id_, move(message_response)); CHECK(send_status == MessageBus::SendStatus::kOK); @@ -375,8 +462,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, move(partially_filled_block_refs[i]), op_index, rel_id, - shiftboss_client_id_, - bus_); + shiftboss_client_id_local_, + bus_local_); unique_ptr<WorkerMessage> worker_message( WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index)); @@ -386,13 +473,13 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, kRebuildWorkOrderMessage); const size_t worker_index = getSchedulableWorker(); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_ << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage << "') to worker " << worker_index; const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, + QueryExecutionUtil::SendTMBMessage(bus_local_, + shiftboss_client_id_local_, workers_->getClientID(worker_index), move(worker_tagged_message)); CHECK(send_status == MessageBus::SendStatus::kOK); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/query_execution/Shiftboss.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp index e0b4312..05457bd 100644 --- a/query_execution/Shiftboss.hpp +++ b/query_execution/Shiftboss.hpp @@ -39,7 +39,8 @@ #include "tmb/address.h" #include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" + +namespace tmb { class MessageBus; }; namespace quickstep { @@ -63,7 +64,8 @@ class Shiftboss : public Thread { /** * @brief Constructor. * - * @param bus A pointer to the TMB. + * @param bus_global A pointer to the TMB for Foreman. + * @param bus_local A pointer to the TMB for Workers. * @param storage_manager The StorageManager to use. * @param workers A pointer to the WorkerDirectory. * @param hdfs The HDFS connector via libhdfs3. @@ -72,84 +74,12 @@ class Shiftboss : public Thread { * @note If cpu_id is not specified, Shiftboss thread can be possibly moved * around on different CPUs by the OS. **/ - Shiftboss(tmb::MessageBus *bus, + Shiftboss(tmb::MessageBus *bus_global, + tmb::MessageBus *bus_local, StorageManager *storage_manager, WorkerDirectory *workers, void *hdfs, - const int cpu_id = -1) - : bus_(DCHECK_NOTNULL(bus)), - storage_manager_(DCHECK_NOTNULL(storage_manager)), - workers_(DCHECK_NOTNULL(workers)), - hdfs_(hdfs), - cpu_id_(cpu_id), - shiftboss_client_id_(tmb::kClientIdNone), - foreman_client_id_(tmb::kClientIdNone), - max_msgs_per_worker_(1), - start_worker_index_(0u) { - // Check to have at least one Worker. - DCHECK_GT(workers->getNumWorkers(), 0u); - -#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS - if (FLAGS_use_hdfs) { - CHECK(hdfs_); - } -#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS - - shiftboss_client_id_ = bus_->Connect(); - LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_; - DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone); - - // Messages between Foreman and Shiftboss. - bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage); - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage); - - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryInitiateMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryInitiateResponseMessage); - - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage); - - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage); - - // Message sent to Worker. - bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationResponseMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage); - - // Forward the following message types from Foreman to Workers. - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage); - - // Forward the following message types from Workers to Foreman. - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage); - - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage); - - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage); - - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage); - - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage); - - // Clean up query execution states, i.e., QueryContext. - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage); - - // Stop itself. - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage); - // Stop all workers. - bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage); - - for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) { - worker_addresses_.AddRecipient(workers_->getClientID(i)); - } - - registerWithForeman(); - } + const int cpu_id = -1); ~Shiftboss() override { } @@ -160,7 +90,7 @@ class Shiftboss : public Thread { * @return TMB client ID of shiftboss thread. **/ inline tmb::client_id getBusClientID() const { - return shiftboss_client_id_; + return shiftboss_client_id_global_; } /** @@ -231,9 +161,7 @@ class Shiftboss : public Thread { const QueryContext::insert_destination_id dest_index, const relation_id rel_id); - // TODO(zuyu): Use two buses for the message communication between Foreman and Shiftboss, - // and Shiftboss and Worker thread pool. - tmb::MessageBus *bus_; + tmb::MessageBus *bus_global_, *bus_local_; CatalogDatabaseCache database_cache_; StorageManager *storage_manager_; @@ -245,7 +173,7 @@ class Shiftboss : public Thread { // The ID of the CPU that the Shiftboss thread can optionally be pinned to. const int cpu_id_; - tmb::client_id shiftboss_client_id_, foreman_client_id_; + tmb::client_id shiftboss_client_id_global_, shiftboss_client_id_local_, foreman_client_id_; // Unique per Shiftboss instance. std::uint64_t shiftboss_index_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp index c9f5a10..6bd7a1f 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp @@ -76,6 +76,7 @@ const char *DistributedExecutionGeneratorTestRunner::kResetOption = DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner(const string &storage_path) : query_id_(0), + bus_locals_(kNumInstances), data_exchangers_(kNumInstances) { bus_.Initialize(); @@ -113,7 +114,10 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner kAnyNUMANodeID); for (int i = 0; i < kNumInstances; ++i) { - workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, &bus_)); + tmb::MessageBus *bus_local = &bus_locals_[i]; + bus_local->Initialize(); + + workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, bus_local)); const vector<tmb::client_id> worker_client_ids(1, workers_.back()->getBusClientID()); worker_directories_.push_back( @@ -128,7 +132,7 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner data_exchangers_[i].set_storage_manager(storage_manager.get()); shiftbosses_.push_back( - make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get(), + make_unique<Shiftboss>(&bus_, bus_local, storage_manager.get(), worker_directories_.back().get(), storage_manager->hdfs())); storage_managers_.push_back(move(storage_manager)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b88625d8/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp index 63e320d..2cd2427 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp @@ -129,6 +129,7 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner { std::unique_ptr<ForemanDistributed> foreman_; + std::vector<MessageBusImpl> bus_locals_; std::vector<std::unique_ptr<Worker>> workers_; std::vector<std::unique_ptr<WorkerDirectory>> worker_directories_; std::vector<DataExchangerAsync> data_exchangers_;