Renamed Foreman related classes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4fb884c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4fb884c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4fb884c1 Branch: refs/heads/dist-exe-test-new Commit: 4fb884c1bef53b00ec3e0362b8de401b7c0b07f6 Parents: a37bf26 Author: Zuyu Zhang <zu...@apache.org> Authored: Thu Jul 7 14:13:19 2016 -0500 Committer: Zuyu Zhang <zu...@twitter.com> Committed: Fri Jul 29 16:42:10 2016 -0700 ---------------------------------------------------------------------- CMakeLists.txt | 2 +- cli/CMakeLists.txt | 1 - cli/CommandExecutor.cpp | 1 - cli/CommandExecutor.hpp | 1 - cli/QuickstepCli.cpp | 21 +- cli/tests/CMakeLists.txt | 2 +- cli/tests/CommandExecutorTestRunner.cpp | 2 +- cli/tests/CommandExecutorTestRunner.hpp | 15 +- query_execution/CMakeLists.txt | 28 +- query_execution/Foreman.cpp | 255 ------------------ query_execution/Foreman.hpp | 140 ---------- query_execution/ForemanBase.hpp | 85 ++++++ query_execution/ForemanLite.hpp | 85 ------ query_execution/ForemanSingleNode.cpp | 256 +++++++++++++++++++ query_execution/ForemanSingleNode.hpp | 140 ++++++++++ query_optimizer/tests/CMakeLists.txt | 2 +- .../tests/ExecutionGeneratorTestRunner.cpp | 2 +- .../tests/ExecutionGeneratorTestRunner.hpp | 15 +- 18 files changed, 527 insertions(+), 526 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index de6754a..042c050 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -756,7 +756,7 @@ target_link_libraries(quickstep_cli_shell quickstep_parser_ParseStatement quickstep_parser_SqlParserWrapper quickstep_queryexecution_AdmitRequestMessage - quickstep_queryexecution_Foreman + quickstep_queryexecution_ForemanSingleNode quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt index 44ec223..9637055 100644 --- a/cli/CMakeLists.txt +++ b/cli/CMakeLists.txt @@ -89,7 +89,6 @@ target_link_libraries(quickstep_cli_CommandExecutor quickstep_cli_PrintToScreen quickstep_parser_ParseStatement quickstep_parser_SqlParserWrapper - quickstep_queryexecution_Foreman quickstep_queryoptimizer_QueryHandle quickstep_queryoptimizer_QueryPlan quickstep_queryoptimizer_QueryProcessor http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/CommandExecutor.cpp ---------------------------------------------------------------------- diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp index 7083ef5..8acfae8 100644 --- a/cli/CommandExecutor.cpp +++ b/cli/CommandExecutor.cpp @@ -34,7 +34,6 @@ #include "parser/ParseStatement.hpp" #include "parser/ParseString.hpp" #include "parser/SqlParserWrapper.hpp" -#include "query_execution/Foreman.hpp" #include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryPlan.hpp" #include "query_optimizer/QueryProcessor.hpp" http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/CommandExecutor.hpp ---------------------------------------------------------------------- diff --git a/cli/CommandExecutor.hpp b/cli/CommandExecutor.hpp index 3435aeb..19d03e6 100644 --- a/cli/CommandExecutor.hpp +++ b/cli/CommandExecutor.hpp @@ -32,7 +32,6 @@ namespace tmb { class MessageBus; } namespace quickstep { class CatalogDatabase; -class Foreman; class ParseStatement; class QueryProcessor; class StorageManager; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/QuickstepCli.cpp ---------------------------------------------------------------------- diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp index 02a55a0..68a3599 100644 --- a/cli/QuickstepCli.cpp +++ b/cli/QuickstepCli.cpp @@ -58,7 +58,7 @@ typedef quickstep::LineReaderDumb LineReaderImpl; #include "parser/ParseStatement.hpp" #include "parser/SqlParserWrapper.hpp" #include "query_execution/AdmitRequestMessage.hpp" -#include "query_execution/Foreman.hpp" +#include "query_execution/ForemanSingleNode.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryExecutionUtil.hpp" #include "query_execution/Worker.hpp" @@ -104,7 +104,7 @@ using quickstep::AdmitRequestMessage; using quickstep::CatalogRelation; using quickstep::DefaultsConfigurator; using quickstep::DropRelation; -using quickstep::Foreman; +using quickstep::ForemanSingleNode; using quickstep::InputParserUtil; using quickstep::MessageBusImpl; using quickstep::MessageStyle; @@ -353,14 +353,15 @@ int main(int argc, char* argv[]) { worker_client_ids, worker_numa_nodes); - Foreman foreman(main_thread_client_id, - &worker_directory, - &bus, - query_processor->getDefaultDatabase(), - query_processor->getStorageManager(), - -1, // Don't pin the Foreman thread. - num_numa_nodes_system, - quickstep::FLAGS_profile_and_report_workorder_perf); + ForemanSingleNode foreman( + main_thread_client_id, + &worker_directory, + &bus, + query_processor->getDefaultDatabase(), + query_processor->getStorageManager(), + -1, // Don't pin the Foreman thread. + num_numa_nodes_system, + quickstep::FLAGS_profile_and_report_workorder_perf); // Start the worker threads. for (Worker &worker : workers) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt index d177d6c..7da56d1 100644 --- a/cli/tests/CMakeLists.txt +++ b/cli/tests/CMakeLists.txt @@ -33,7 +33,7 @@ target_link_libraries(quickstep_cli_tests_CommandExecutorTest quickstep_parser_ParseStatement quickstep_parser_SqlParserWrapper quickstep_queryexecution_AdmitRequestMessage - quickstep_queryexecution_Foreman + quickstep_queryexecution_ForemanSingleNode quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/tests/CommandExecutorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp index 794f7e1..bd7082f 100644 --- a/cli/tests/CommandExecutorTestRunner.cpp +++ b/cli/tests/CommandExecutorTestRunner.cpp @@ -27,7 +27,7 @@ #include "cli/PrintToScreen.hpp" #include "parser/ParseStatement.hpp" #include "query_execution/AdmitRequestMessage.hpp" -#include "query_execution/Foreman.hpp" +#include "query_execution/ForemanSingleNode.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/Worker.hpp" #include "query_optimizer/ExecutionGenerator.hpp" http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/cli/tests/CommandExecutorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/cli/tests/CommandExecutorTestRunner.hpp b/cli/tests/CommandExecutorTestRunner.hpp index 8fb5b65..69692ae 100644 --- a/cli/tests/CommandExecutorTestRunner.hpp +++ b/cli/tests/CommandExecutorTestRunner.hpp @@ -25,7 +25,7 @@ #include <vector> #include "parser/SqlParserWrapper.hpp" -#include "query_execution/Foreman.hpp" +#include "query_execution/ForemanSingleNode.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryExecutionUtil.hpp" #include "query_execution/Worker.hpp" @@ -77,11 +77,12 @@ class CommandExecutorTestRunner : public TextBasedTestRunner { workers_.reset(new WorkerDirectory(1 /* number of workers */, worker_client_ids, numa_nodes)); - foreman_.reset(new Foreman(main_thread_client_id_, - workers_.get(), - &bus_, - test_database_loader_.catalog_database(), - test_database_loader_.storage_manager())); + foreman_.reset( + new ForemanSingleNode(main_thread_client_id_, + workers_.get(), + &bus_, + test_database_loader_.catalog_database(), + test_database_loader_.storage_manager())); foreman_->start(); worker_->start(); @@ -104,7 +105,7 @@ class CommandExecutorTestRunner : public TextBasedTestRunner { tmb::client_id main_thread_client_id_; MessageBusImpl bus_; - std::unique_ptr<Foreman> foreman_; + std::unique_ptr<ForemanSingleNode> foreman_; std::unique_ptr<Worker> worker_; std::unique_ptr<WorkerDirectory> workers_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index b031a44..2be451c 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -1,6 +1,6 @@ # Copyright 2011-2015 Quickstep Technologies LLC. # Copyright 2015-2016 Pivotal Software, Inc. -# Copyright 2016, Quickstep Research Group, Computer Sciences Department, +# Copyright 2016, Quickstep Research Group, Computer Sciences Department, # University of WisconsinâMadison. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -32,8 +32,8 @@ if (ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp) endif() add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp) -add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp) -add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp) +add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp) +add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp) add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp) add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp) add_library(quickstep_queryexecution_QueryContext_proto @@ -69,11 +69,15 @@ if (ENABLE_DISTRIBUTED) quickstep_utility_Macros tmb) endif() -target_link_libraries(quickstep_queryexecution_Foreman - ${GFLAGS_LIB_NAME} +target_link_libraries(quickstep_queryexecution_ForemanBase + glog + quickstep_threading_Thread + quickstep_utility_Macros + tmb) +target_link_libraries(quickstep_queryexecution_ForemanSingleNode glog quickstep_queryexecution_AdmitRequestMessage - quickstep_queryexecution_ForemanLite + quickstep_queryexecution_ForemanBase quickstep_queryexecution_PolicyEnforcer quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil @@ -82,12 +86,8 @@ target_link_libraries(quickstep_queryexecution_Foreman quickstep_threading_ThreadUtil quickstep_utility_EqualsAnyConstant quickstep_utility_Macros - tmb) -target_link_libraries(quickstep_queryexecution_ForemanLite - glog - quickstep_threading_Thread - quickstep_utility_Macros - tmb) + tmb + ${GFLAGS_LIB_NAME}) target_link_libraries(quickstep_queryexecution_PolicyEnforcer ${GFLAGS_LIB_NAME} glog @@ -199,8 +199,8 @@ target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp) target_link_libraries(quickstep_queryexecution quickstep_queryexecution_AdmitRequestMessage - quickstep_queryexecution_Foreman - quickstep_queryexecution_ForemanLite + quickstep_queryexecution_ForemanBase + quickstep_queryexecution_ForemanSingleNode quickstep_queryexecution_PolicyEnforcer quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryContext_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/Foreman.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp deleted file mode 100644 index 98146e2..0000000 --- a/query_execution/Foreman.cpp +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Copyright 2011-2015 Quickstep Technologies LLC. - * Copyright 2015-2016 Pivotal Software, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -#include "query_execution/Foreman.hpp" - -#include <cstddef> -#include <cstdio> -#include <memory> -#include <tuple> -#include <utility> -#include <vector> - -#include "query_execution/AdmitRequestMessage.hpp" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/QueryExecutionUtil.hpp" -#include "query_execution/WorkerDirectory.hpp" -#include "query_execution/WorkerMessage.hpp" -#include "threading/ThreadUtil.hpp" -#include "utility/EqualsAnyConstant.hpp" -#include "utility/Macros.hpp" - -#include "gflags/gflags.h" -#include "glog/logging.h" - -#include "tmb/message_bus.h" -#include "tmb/tagged_message.h" - -using std::move; -using std::size_t; -using std::unique_ptr; -using std::vector; - -namespace quickstep { - -DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number " - "of pending work orders for the worker. This information is used " - "by the Foreman to assign work orders to worker threads"); - -Foreman::Foreman(const tmb::client_id main_thread_client_id, - WorkerDirectory *worker_directory, - tmb::MessageBus *bus, - CatalogDatabaseLite *catalog_database, - StorageManager *storage_manager, - const int cpu_id, - const size_t num_numa_nodes, - const bool profile_individual_workorders) - : ForemanLite(bus, cpu_id), - main_thread_client_id_(main_thread_client_id), - worker_directory_(DCHECK_NOTNULL(worker_directory)), - catalog_database_(DCHECK_NOTNULL(catalog_database)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) { - const std::vector<QueryExecutionMessageType> sender_message_types{ - kPoisonMessage, - kRebuildWorkOrderMessage, - kWorkOrderMessage, - kWorkloadCompletionMessage}; - - for (const auto message_type : sender_message_types) { - bus_->RegisterClientAsSender(foreman_client_id_, message_type); - } - - const std::vector<QueryExecutionMessageType> receiver_message_types{ - kAdmitRequestMessage, - kCatalogRelationNewBlockMessage, - kDataPipelineMessage, - kPoisonMessage, - kRebuildWorkOrderCompleteMessage, - kWorkOrderFeedbackMessage, - kWorkOrdersAvailableMessage, - kWorkOrderCompleteMessage}; - - for (const auto message_type : receiver_message_types) { - bus_->RegisterClientAsReceiver(foreman_client_id_, message_type); - } - - policy_enforcer_.reset(new PolicyEnforcer( - foreman_client_id_, - num_numa_nodes, - catalog_database_, - storage_manager_, - worker_directory_, - bus_, - profile_individual_workorders)); -} - -void Foreman::run() { - if (cpu_id_ >= 0) { - // We can pin the foreman thread to a CPU if specified. - ThreadUtil::BindToCPU(cpu_id_); - } - - // Event loop - for (;;) { - // Receive() causes this thread to sleep until next message is received. - const AnnotatedMessage annotated_msg = - bus_->Receive(foreman_client_id_, 0, true); - const TaggedMessage &tagged_message = annotated_msg.tagged_message; - const tmb::message_type_id message_type = tagged_message.message_type(); - switch (message_type) { - case kCatalogRelationNewBlockMessage: // Fall through - case kDataPipelineMessage: - case kRebuildWorkOrderCompleteMessage: - case kWorkOrderCompleteMessage: - case kWorkOrderFeedbackMessage: - case kWorkOrdersAvailableMessage: { - policy_enforcer_->processMessage(tagged_message); - break; - } - - case kAdmitRequestMessage: { - const AdmitRequestMessage *msg = - static_cast<const AdmitRequestMessage *>(tagged_message.message()); - const vector<QueryHandle *> &query_handles = msg->getQueryHandles(); - - DCHECK(!query_handles.empty()); - bool all_queries_admitted = true; - if (query_handles.size() == 1u) { - all_queries_admitted = - policy_enforcer_->admitQuery(query_handles.front()); - } else { - all_queries_admitted = policy_enforcer_->admitQueries(query_handles); - } - if (!all_queries_admitted) { - LOG(WARNING) << "The scheduler could not admit all the queries"; - // TODO(harshad) - Inform the main thread about the failure. - } - break; - } - case kPoisonMessage: { - if (policy_enforcer_->hasQueries()) { - LOG(WARNING) << "Foreman thread exiting while some queries are " - "under execution or waiting to be admitted"; - } - return; - } - default: - LOG(FATAL) << "Unknown message type to Foreman"; - } - - if (canCollectNewMessages(message_type)) { - vector<unique_ptr<WorkerMessage>> new_messages; - policy_enforcer_->getWorkerMessages(&new_messages); - dispatchWorkerMessages(new_messages); - } - - // We check again, as some queries may produce zero work orders and finish - // their execution. - if (!policy_enforcer_->hasQueries()) { - // Signal the main thread that there are no queries to be executed. - // Currently the message doesn't have any real content. - const int dummy_payload = 0; - TaggedMessage completion_tagged_message( - &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage); - const tmb::MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage( - bus_, - foreman_client_id_, - main_thread_client_id_, - move(completion_tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) - << "Message could not be sent from Foreman with TMB client ID " - << foreman_client_id_ << " to main thread with TMB client ID" - << main_thread_client_id_; - } - } -} - -bool Foreman::canCollectNewMessages(const tmb::message_type_id message_type) { - if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, - kCatalogRelationNewBlockMessage, - kWorkOrderFeedbackMessage)) { - return false; - } else if (worker_directory_->getLeastLoadedWorker().second <= - FLAGS_min_load_per_worker) { - // If the least loaded worker has only one pending work order, we should - // collect new messages and dispatch them. - return true; - } else { - return false; - } -} - -void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) { - for (const auto &message : messages) { - DCHECK(message != nullptr); - const int recipient_worker_thread_index = message->getRecipientHint(); - if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) { - sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index), - *message); - worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index); - } else { - const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first; - sendWorkerMessage(least_loaded_worker_thread_index, *message); - worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index); - } - } -} - -void Foreman::sendWorkerMessage(const size_t worker_thread_index, - const WorkerMessage &message) { - tmb::message_type_id type; - if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) { - type = kRebuildWorkOrderMessage; - } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) { - type = kWorkOrderMessage; - } else { - FATAL_ERROR("Invalid WorkerMessageType"); - } - TaggedMessage worker_tagged_message(&message, sizeof(message), type); - - const tmb::MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - worker_directory_->getClientID(worker_thread_index), - move(worker_tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << - "Message could not be sent from Foreman with TMB client ID " - << foreman_client_id_ << " to Foreman with TMB client ID " - << worker_directory_->getClientID(worker_thread_index); -} - -void Foreman::printWorkOrderProfilingResults(const std::size_t query_id, - std::FILE *out) const { - const std::vector< - std::tuple<std::size_t, std::size_t, std::size_t>> - &recorded_times = policy_enforcer_->getProfilingResults(query_id); - fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out); - for (auto workorder_entry : recorded_times) { - // Note: Index of the "worker thread index" in the tuple is 0. - const std::size_t worker_id = std::get<0>(workorder_entry); - fprintf(out, - "%lu,%lu,%d,%lu,%lu\n", - query_id, - worker_id, - worker_directory_->getNUMANode(worker_id), - std::get<1>(workorder_entry), // Operator ID. - std::get<2>(workorder_entry)); // Time. - } -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/Foreman.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp deleted file mode 100644 index 7be57e7..0000000 --- a/query_execution/Foreman.hpp +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Copyright 2011-2015 Quickstep Technologies LLC. - * Copyright 2015-2016 Pivotal Software, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_ - -#include <cstddef> -#include <cstdio> -#include <memory> -#include <vector> - -#include "query_execution/ForemanLite.hpp" -#include "query_execution/PolicyEnforcer.hpp" -#include "utility/Macros.hpp" - -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" - -namespace quickstep { - -class CatalogDatabaseLite; -class StorageManager; -class WorkerDirectory; -class WorkerMessage; - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief The Foreman receives queries from the main thread, messages from the - * policy enforcer and dispatches the work to worker threads. It also - * receives work completion messages from workers. - **/ -class Foreman final : public ForemanLite { - public: - /** - * @brief Constructor. - * - * @param main_thread_client_id The TMB client ID of the main thread. - * @param worker_directory The worker directory. - * @param bus A pointer to the TMB. - * @param catalog_database The catalog database where this query is executed. - * @param storage_manager The StorageManager to use. - * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. - * @param num_numa_nodes The number of NUMA nodes in the system. - * @param profile_individual_workorders Whether every workorder's execution - * be profiled or not. - * - * @note If cpu_id is not specified, Foreman thread can be possibly moved - * around on different CPUs by the OS. - **/ - Foreman(const tmb::client_id main_thread_client_id, - WorkerDirectory *worker_directory, - tmb::MessageBus *bus, - CatalogDatabaseLite *catalog_database, - StorageManager *storage_manager, - const int cpu_id = -1, - const std::size_t num_numa_nodes = 1, - const bool profile_individual_workorders = false); - - ~Foreman() override {} - - /** - * @brief Print the results of profiling individual work orders for a given - * query. - * - * TODO(harshad) - Add the name of the operator to the output. - * TODO(harshad) - Add the CPU core ID of the operator to the output. This - * will require modifying the WorkerDirectory to remember worker affinities. - * Until then, the users can refer to the worker_affinities provided to the - * cli to infer the CPU core ID where a given worker is pinned. - * - * @param query_id The ID of the query for which the results are to be printed. - * @param out The file stream. - **/ - void printWorkOrderProfilingResults(const std::size_t query_id, - std::FILE *out) const; - - protected: - void run() override; - - private: - /** - * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the - * worker threads. - * - * @param messages The messages to be dispatched. - **/ - void dispatchWorkerMessages( - const std::vector<std::unique_ptr<WorkerMessage>> &messages); - - /** - * @brief Send the given message to the specified worker. - * - * @param worker_thread_index The logical index of the recipient worker thread - * in WorkerDirectory. - * @param message The WorkerMessage to be sent. - **/ - void sendWorkerMessage(const std::size_t worker_thread_index, - const WorkerMessage &message); - - /** - * @brief Check if we can collect new messages from the PolicyEnforcer. - * - * @param message_type The type of the last received message. - **/ - bool canCollectNewMessages(const tmb::message_type_id message_type); - - const tmb::client_id main_thread_client_id_; - - WorkerDirectory *worker_directory_; - - CatalogDatabaseLite *catalog_database_; - StorageManager *storage_manager_; - - std::unique_ptr<PolicyEnforcer> policy_enforcer_; - - DISALLOW_COPY_AND_ASSIGN(Foreman); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanBase.hpp b/query_execution/ForemanBase.hpp new file mode 100644 index 0000000..274b8fc --- /dev/null +++ b/query_execution/ForemanBase.hpp @@ -0,0 +1,85 @@ +/** + * Copyright 2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_ + +#include "threading/Thread.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" + +namespace quickstep { + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief A base class that Foreman implements. This class is used to derive + * for implementations for both the single-node and distributed versions. + **/ +class ForemanBase : public Thread { + public: + /** + * @brief Constructor. + * + * @param bus A pointer to the TMB. + * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. + * + * @note If cpu_id is not specified, Foreman thread can be possibly moved + * around on different CPUs by the OS. + **/ + ForemanBase(tmb::MessageBus *bus, + const int cpu_id) + : bus_(DCHECK_NOTNULL(bus)), + cpu_id_(cpu_id) { + foreman_client_id_ = bus_->Connect(); + } + + ~ForemanBase() override {} + + /** + * @brief Get the TMB client ID of Foreman thread. + * + * @return TMB client ID of foreman thread. + **/ + tmb::client_id getBusClientID() const { + return foreman_client_id_; + } + + protected: + void run() override = 0; + + tmb::MessageBus *bus_; + + tmb::client_id foreman_client_id_; + + // The ID of the CPU that the Foreman thread can optionally be pinned to. + const int cpu_id_; + + private: + DISALLOW_COPY_AND_ASSIGN(ForemanBase); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanLite.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanLite.hpp b/query_execution/ForemanLite.hpp deleted file mode 100644 index cb6cdf3..0000000 --- a/query_execution/ForemanLite.hpp +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Copyright 2016 Pivotal Software, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_ - -#include "threading/Thread.hpp" -#include "utility/Macros.hpp" - -#include "glog/logging.h" - -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" - -namespace quickstep { - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief A base class that Foreman implements. This class is used to derive - * for implementations for both the single-node and distributed versions. - **/ -class ForemanLite : public Thread { - public: - /** - * @brief Constructor. - * - * @param bus A pointer to the TMB. - * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. - * - * @note If cpu_id is not specified, Foreman thread can be possibly moved - * around on different CPUs by the OS. - **/ - ForemanLite(tmb::MessageBus *bus, - const int cpu_id) - : bus_(DCHECK_NOTNULL(bus)), - cpu_id_(cpu_id) { - foreman_client_id_ = bus_->Connect(); - } - - ~ForemanLite() override {} - - /** - * @brief Get the TMB client ID of Foreman thread. - * - * @return TMB client ID of foreman thread. - **/ - tmb::client_id getBusClientID() const { - return foreman_client_id_; - } - - protected: - void run() override = 0; - - tmb::MessageBus *bus_; - - tmb::client_id foreman_client_id_; - - // The ID of the CPU that the Foreman thread can optionally be pinned to. - const int cpu_id_; - - private: - DISALLOW_COPY_AND_ASSIGN(ForemanLite); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_LITE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanSingleNode.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp new file mode 100644 index 0000000..3aa1f0b --- /dev/null +++ b/query_execution/ForemanSingleNode.cpp @@ -0,0 +1,256 @@ +/** + * Copyright 2011-2015 Quickstep Technologies LLC. + * Copyright 2015-2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/ForemanSingleNode.hpp" + +#include <cstddef> +#include <cstdio> +#include <memory> +#include <tuple> +#include <utility> +#include <vector> + +#include "query_execution/AdmitRequestMessage.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "query_execution/WorkerDirectory.hpp" +#include "query_execution/WorkerMessage.hpp" +#include "threading/ThreadUtil.hpp" +#include "utility/EqualsAnyConstant.hpp" +#include "utility/Macros.hpp" + +#include "gflags/gflags.h" +#include "glog/logging.h" + +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +using std::move; +using std::size_t; +using std::unique_ptr; +using std::vector; + +namespace quickstep { + +DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number " + "of pending work orders for the worker. This information is used " + "by the Foreman to assign work orders to worker threads"); + +ForemanSingleNode::ForemanSingleNode( + const tmb::client_id main_thread_client_id, + WorkerDirectory *worker_directory, + tmb::MessageBus *bus, + CatalogDatabaseLite *catalog_database, + StorageManager *storage_manager, + const int cpu_id, + const size_t num_numa_nodes, + const bool profile_individual_workorders) + : ForemanBase(bus, cpu_id), + main_thread_client_id_(main_thread_client_id), + worker_directory_(DCHECK_NOTNULL(worker_directory)), + catalog_database_(DCHECK_NOTNULL(catalog_database)), + storage_manager_(DCHECK_NOTNULL(storage_manager)) { + const std::vector<QueryExecutionMessageType> sender_message_types{ + kPoisonMessage, + kRebuildWorkOrderMessage, + kWorkOrderMessage, + kWorkloadCompletionMessage}; + + for (const auto message_type : sender_message_types) { + bus_->RegisterClientAsSender(foreman_client_id_, message_type); + } + + const std::vector<QueryExecutionMessageType> receiver_message_types{ + kAdmitRequestMessage, + kCatalogRelationNewBlockMessage, + kDataPipelineMessage, + kPoisonMessage, + kRebuildWorkOrderCompleteMessage, + kWorkOrderFeedbackMessage, + kWorkOrdersAvailableMessage, + kWorkOrderCompleteMessage}; + + for (const auto message_type : receiver_message_types) { + bus_->RegisterClientAsReceiver(foreman_client_id_, message_type); + } + + policy_enforcer_.reset(new PolicyEnforcer( + foreman_client_id_, + num_numa_nodes, + catalog_database_, + storage_manager_, + worker_directory_, + bus_, + profile_individual_workorders)); +} + +void ForemanSingleNode::run() { + if (cpu_id_ >= 0) { + // We can pin the foreman thread to a CPU if specified. + ThreadUtil::BindToCPU(cpu_id_); + } + + // Event loop + for (;;) { + // Receive() causes this thread to sleep until next message is received. + const AnnotatedMessage annotated_msg = + bus_->Receive(foreman_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_msg.tagged_message; + const tmb::message_type_id message_type = tagged_message.message_type(); + switch (message_type) { + case kCatalogRelationNewBlockMessage: // Fall through + case kDataPipelineMessage: + case kRebuildWorkOrderCompleteMessage: + case kWorkOrderCompleteMessage: + case kWorkOrderFeedbackMessage: + case kWorkOrdersAvailableMessage: { + policy_enforcer_->processMessage(tagged_message); + break; + } + + case kAdmitRequestMessage: { + const AdmitRequestMessage *msg = + static_cast<const AdmitRequestMessage *>(tagged_message.message()); + const vector<QueryHandle *> &query_handles = msg->getQueryHandles(); + + DCHECK(!query_handles.empty()); + bool all_queries_admitted = true; + if (query_handles.size() == 1u) { + all_queries_admitted = + policy_enforcer_->admitQuery(query_handles.front()); + } else { + all_queries_admitted = policy_enforcer_->admitQueries(query_handles); + } + if (!all_queries_admitted) { + LOG(WARNING) << "The scheduler could not admit all the queries"; + // TODO(harshad) - Inform the main thread about the failure. + } + break; + } + case kPoisonMessage: { + if (policy_enforcer_->hasQueries()) { + LOG(WARNING) << "Foreman thread exiting while some queries are " + "under execution or waiting to be admitted"; + } + return; + } + default: + LOG(FATAL) << "Unknown message type to Foreman"; + } + + if (canCollectNewMessages(message_type)) { + vector<unique_ptr<WorkerMessage>> new_messages; + policy_enforcer_->getWorkerMessages(&new_messages); + dispatchWorkerMessages(new_messages); + } + + // We check again, as some queries may produce zero work orders and finish + // their execution. + if (!policy_enforcer_->hasQueries()) { + // Signal the main thread that there are no queries to be executed. + // Currently the message doesn't have any real content. + const int dummy_payload = 0; + TaggedMessage completion_tagged_message( + &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage); + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage( + bus_, + foreman_client_id_, + main_thread_client_id_, + move(completion_tagged_message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " + << foreman_client_id_ << " to main thread with TMB client ID" + << main_thread_client_id_; + } + } +} + +bool ForemanSingleNode::canCollectNewMessages(const tmb::message_type_id message_type) { + if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, + kCatalogRelationNewBlockMessage, + kWorkOrderFeedbackMessage)) { + return false; + } else if (worker_directory_->getLeastLoadedWorker().second <= + FLAGS_min_load_per_worker) { + // If the least loaded worker has only one pending work order, we should + // collect new messages and dispatch them. + return true; + } else { + return false; + } +} + +void ForemanSingleNode::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) { + for (const auto &message : messages) { + DCHECK(message != nullptr); + const int recipient_worker_thread_index = message->getRecipientHint(); + if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) { + sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index), + *message); + worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index); + } else { + const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first; + sendWorkerMessage(least_loaded_worker_thread_index, *message); + worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index); + } + } +} + +void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index, + const WorkerMessage &message) { + tmb::message_type_id type; + if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) { + type = kRebuildWorkOrderMessage; + } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) { + type = kWorkOrderMessage; + } else { + FATAL_ERROR("Invalid WorkerMessageType"); + } + TaggedMessage worker_tagged_message(&message, sizeof(message), type); + + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + worker_directory_->getClientID(worker_thread_index), + move(worker_tagged_message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << + "Message could not be sent from Foreman with TMB client ID " + << foreman_client_id_ << " to Foreman with TMB client ID " + << worker_directory_->getClientID(worker_thread_index); +} + +void ForemanSingleNode::printWorkOrderProfilingResults(const std::size_t query_id, + std::FILE *out) const { + const std::vector< + std::tuple<std::size_t, std::size_t, std::size_t>> + &recorded_times = policy_enforcer_->getProfilingResults(query_id); + fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out); + for (auto workorder_entry : recorded_times) { + // Note: Index of the "worker thread index" in the tuple is 0. + const std::size_t worker_id = std::get<0>(workorder_entry); + fprintf(out, + "%lu,%lu,%d,%lu,%lu\n", + query_id, + worker_id, + worker_directory_->getNUMANode(worker_id), + std::get<1>(workorder_entry), // Operator ID. + std::get<2>(workorder_entry)); // Time. + } +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_execution/ForemanSingleNode.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp new file mode 100644 index 0000000..7506d35 --- /dev/null +++ b/query_execution/ForemanSingleNode.hpp @@ -0,0 +1,140 @@ +/** + * Copyright 2011-2015 Quickstep Technologies LLC. + * Copyright 2015-2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_ + +#include <cstddef> +#include <cstdio> +#include <memory> +#include <vector> + +#include "query_execution/ForemanBase.hpp" +#include "query_execution/PolicyEnforcer.hpp" +#include "utility/Macros.hpp" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" + +namespace quickstep { + +class CatalogDatabaseLite; +class StorageManager; +class WorkerDirectory; +class WorkerMessage; + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief The Foreman receives queries from the main thread, messages from the + * policy enforcer and dispatches the work to worker threads. It also + * receives work completion messages from workers. + **/ +class ForemanSingleNode final : public ForemanBase { + public: + /** + * @brief Constructor. + * + * @param main_thread_client_id The TMB client ID of the main thread. + * @param worker_directory The worker directory. + * @param bus A pointer to the TMB. + * @param catalog_database The catalog database where this query is executed. + * @param storage_manager The StorageManager to use. + * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. + * @param num_numa_nodes The number of NUMA nodes in the system. + * @param profile_individual_workorders Whether every workorder's execution + * be profiled or not. + * + * @note If cpu_id is not specified, Foreman thread can be possibly moved + * around on different CPUs by the OS. + **/ + ForemanSingleNode(const tmb::client_id main_thread_client_id, + WorkerDirectory *worker_directory, + tmb::MessageBus *bus, + CatalogDatabaseLite *catalog_database, + StorageManager *storage_manager, + const int cpu_id = -1, + const std::size_t num_numa_nodes = 1, + const bool profile_individual_workorders = false); + + ~ForemanSingleNode() override {} + + /** + * @brief Print the results of profiling individual work orders for a given + * query. + * + * TODO(harshad) - Add the name of the operator to the output. + * TODO(harshad) - Add the CPU core ID of the operator to the output. This + * will require modifying the WorkerDirectory to remember worker affinities. + * Until then, the users can refer to the worker_affinities provided to the + * cli to infer the CPU core ID where a given worker is pinned. + * + * @param query_id The ID of the query for which the results are to be printed. + * @param out The file stream. + **/ + void printWorkOrderProfilingResults(const std::size_t query_id, + std::FILE *out) const; + + protected: + void run() override; + + private: + /** + * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the + * worker threads. + * + * @param messages The messages to be dispatched. + **/ + void dispatchWorkerMessages( + const std::vector<std::unique_ptr<WorkerMessage>> &messages); + + /** + * @brief Send the given message to the specified worker. + * + * @param worker_thread_index The logical index of the recipient worker thread + * in WorkerDirectory. + * @param message The WorkerMessage to be sent. + **/ + void sendWorkerMessage(const std::size_t worker_thread_index, + const WorkerMessage &message); + + /** + * @brief Check if we can collect new messages from the PolicyEnforcer. + * + * @param message_type The type of the last received message. + **/ + bool canCollectNewMessages(const tmb::message_type_id message_type); + + const tmb::client_id main_thread_client_id_; + + WorkerDirectory *worker_directory_; + + CatalogDatabaseLite *catalog_database_; + StorageManager *storage_manager_; + + std::unique_ptr<PolicyEnforcer> policy_enforcer_; + + DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_optimizer/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt index 5b58f75..9cad47f 100644 --- a/query_optimizer/tests/CMakeLists.txt +++ b/query_optimizer/tests/CMakeLists.txt @@ -117,7 +117,7 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest quickstep_parser_ParseStatement quickstep_parser_SqlParserWrapper quickstep_queryexecution_AdmitRequestMessage - quickstep_queryexecution_Foreman + quickstep_queryexecution_ForemanSingleNode quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp index 8c1d306..563a777 100644 --- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp @@ -25,7 +25,7 @@ #include "cli/PrintToScreen.hpp" #include "parser/ParseStatement.hpp" #include "query_execution/AdmitRequestMessage.hpp" -#include "query_execution/Foreman.hpp" +#include "query_execution/ForemanSingleNode.hpp" #include "query_execution/QueryExecutionUtil.hpp" #include "query_execution/Worker.hpp" #include "query_optimizer/ExecutionGenerator.hpp" http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4fb884c1/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp index bb2a26f..d1d9380 100644 --- a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp +++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp @@ -25,7 +25,7 @@ #include <vector> #include "parser/SqlParserWrapper.hpp" -#include "query_execution/Foreman.hpp" +#include "query_execution/ForemanSingleNode.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/Worker.hpp" #include "query_execution/WorkerDirectory.hpp" @@ -80,11 +80,12 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner { workers_.reset(new WorkerDirectory(1 /* number of workers */, worker_client_ids, numa_nodes)); - foreman_.reset(new Foreman(main_thread_client_id_, - workers_.get(), - &bus_, - test_database_loader_.catalog_database(), - test_database_loader_.storage_manager())); + foreman_.reset( + new ForemanSingleNode(main_thread_client_id_, + workers_.get(), + &bus_, + test_database_loader_.catalog_database(), + test_database_loader_.storage_manager())); foreman_->start(); worker_->start(); @@ -105,7 +106,7 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner { TestDatabaseLoader test_database_loader_; MessageBusImpl bus_; - std::unique_ptr<Foreman> foreman_; + std::unique_ptr<ForemanSingleNode> foreman_; std::unique_ptr<Worker> worker_; std::unique_ptr<WorkerDirectory> workers_;