Long lived Foreman thread - Foreman thread lives through the lifetime of the Quickstep process. - Foreman and main thread communicate through TMB messages. - Foreman admits queries and routes them to PolicyEnforcer. - Foreman relays messages to policy enforcer which in turn processes it, based on the query ID of the message. - All the tests modified accordingly.
Created PolicyEnforcer class. - First point of entry for queries in the scheduler. - Can perform admission control. - Can talk to the QueryManagers of the active queries to provide them messages to process and collect work orders for execution from them. - Support for admitting multiple queries to the PolicyEnforcer. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8230b124 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8230b124 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8230b124 Branch: refs/heads/adaptive-bloom-filters Commit: 8230b12495297b6837a66485119da44d8fb95a26 Parents: 659967a Author: Harshad Deshmukh <hars...@cs.wisc.edu> Authored: Sat Apr 9 15:08:40 2016 -0500 Committer: Harshad Deshmukh <hbdeshm...@apache.org> Committed: Sun Jun 12 09:18:14 2016 -0500 ---------------------------------------------------------------------- CMakeLists.txt | 2 + catalog/CatalogTypedefs.hpp | 3 + cli/CommandExecutor.cpp | 53 +- cli/CommandExecutor.hpp | 11 +- cli/QuickstepCli.cpp | 66 +- cli/tests/CMakeLists.txt | 2 + cli/tests/CommandExecutorTestRunner.cpp | 29 +- cli/tests/CommandExecutorTestRunner.hpp | 37 +- query_execution/AdmitRequestMessage.hpp | 73 ++ query_execution/CMakeLists.txt | 70 +- query_execution/Foreman.cpp | 578 +++-------- query_execution/Foreman.hpp | 393 +------- query_execution/PolicyEnforcer.cpp | 183 ++++ query_execution/PolicyEnforcer.hpp | 167 ++++ query_execution/QueryContext.cpp | 16 +- query_execution/QueryContext.proto | 2 + query_execution/QueryExecutionMessages.proto | 4 + query_execution/QueryExecutionTypedefs.hpp | 5 +- query_execution/QueryExecutionUtil.hpp | 52 + query_execution/QueryManager.hpp | 5 +- query_execution/WorkOrdersContainer.hpp | 70 +- query_execution/Worker.cpp | 11 +- query_execution/Worker.hpp | 2 + query_execution/WorkerMessage.hpp | 24 +- query_execution/tests/Foreman_unittest.cpp | 952 ------------------- query_execution/tests/QueryManager_unittest.cpp | 7 +- .../tests/WorkOrdersContainer_unittest.cpp | 26 + query_optimizer/ExecutionGenerator.hpp | 1 + query_optimizer/tests/CMakeLists.txt | 2 + .../tests/ExecutionGeneratorTestRunner.cpp | 21 +- .../tests/ExecutionGeneratorTestRunner.hpp | 33 +- query_optimizer/tests/TestDatabaseLoader.cpp | 1 + relational_operators/DeleteOperator.cpp | 1 + relational_operators/DeleteOperator.hpp | 1 + relational_operators/HashJoinOperator.hpp | 4 +- relational_operators/RebuildWorkOrder.hpp | 1 + relational_operators/SortMergeRunOperator.cpp | 1 + relational_operators/UpdateOperator.cpp | 1 + relational_operators/UpdateOperator.hpp | 1 + relational_operators/WorkOrder.hpp | 20 +- .../tests/AggregationOperator_unittest.cpp | 4 + .../tests/HashJoinOperator_unittest.cpp | 6 + .../tests/SortMergeRunOperator_unittest.cpp | 4 + .../SortRunGenerationOperator_unittest.cpp | 1 + .../tests/TextScanOperator_unittest.cpp | 1 + storage/InsertDestination.cpp | 50 +- storage/InsertDestination.hpp | 71 +- 47 files changed, 1064 insertions(+), 2004 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 207f313..9e445f0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -721,9 +721,11 @@ target_link_libraries(quickstep_cli_shell quickstep_cli_PrintToScreen quickstep_parser_ParseStatement quickstep_parser_SqlParserWrapper + quickstep_queryexecution_AdmitRequestMessage quickstep_queryexecution_Foreman quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_Worker quickstep_queryexecution_WorkerDirectory quickstep_queryexecution_WorkerMessage http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/catalog/CatalogTypedefs.hpp ---------------------------------------------------------------------- diff --git a/catalog/CatalogTypedefs.hpp b/catalog/CatalogTypedefs.hpp index 213d91d..44832b6 100644 --- a/catalog/CatalogTypedefs.hpp +++ b/catalog/CatalogTypedefs.hpp @@ -46,6 +46,9 @@ const int kCatalogMaxID = INT_MAX; // id for the catalog ids. constexpr int kInvalidCatalogId = -1; +// Used to indicate no preference for a NUMA Node ID. +constexpr numa_node_id kAnyNUMANodeID = -1; + /** @} */ } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/cli/CommandExecutor.cpp ---------------------------------------------------------------------- diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp index ddcd38f..dc14741 100644 --- a/cli/CommandExecutor.cpp +++ b/cli/CommandExecutor.cpp @@ -51,6 +51,8 @@ #include "glog/logging.h" +#include "tmb/id_typedefs.h" + using std::fprintf; using std::fputc; using std::fputs; @@ -58,6 +60,8 @@ using std::size_t; using std::string; using std::vector; +namespace tmb { class MessageBus; } + namespace quickstep { namespace cli { namespace { @@ -194,11 +198,14 @@ void executeDescribeTable( /** * @brief A helper function that executes a SQL query to obtain a scalar result. */ -inline TypedValue executeQueryForSingleResult(const std::string &query_string, - StorageManager *storage_manager, - QueryProcessor *query_processor, - SqlParserWrapper *parser_wrapper, - Foreman *foreman) { +inline TypedValue executeQueryForSingleResult( + const tmb::client_id main_thread_client_id, + const tmb::client_id foreman_client_id, + const std::string &query_string, + tmb::MessageBus *bus, + StorageManager *storage_manager, + QueryProcessor *query_processor, + SqlParserWrapper *parser_wrapper) { parser_wrapper->feedNextBuffer(new std::string(query_string)); ParseResult result = parser_wrapper->getNextStatement(); @@ -210,11 +217,8 @@ inline TypedValue executeQueryForSingleResult(const std::string &query_string, DCHECK(query_handle->getQueryPlanMutable() != nullptr); // Use foreman to execute the query plan. - foreman->setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable()); - foreman->reconstructQueryContextFromProto(query_handle->getQueryContextProto()); - - foreman->start(); - foreman->join(); + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + main_thread_client_id, foreman_client_id, query_handle.get(), bus); // Retrieve the scalar result from the result relation. const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation(); @@ -246,8 +250,10 @@ inline TypedValue executeQueryForSingleResult(const std::string &query_string, return value; } -void executeAnalyze(QueryProcessor *query_processor, - Foreman *foreman, +void executeAnalyze(const tmb::client_id main_thread_client_id, + const tmb::client_id foreman_client_id, + MessageBus *bus, + QueryProcessor *query_processor, FILE *out) { const CatalogDatabase &database = *query_processor->getDefaultDatabase(); StorageManager *storage_manager = query_processor->getStorageManager(); @@ -273,11 +279,13 @@ void executeAnalyze(QueryProcessor *query_processor, query_string.append(";"); TypedValue num_distinct_values = - executeQueryForSingleResult(query_string, + executeQueryForSingleResult(main_thread_client_id, + foreman_client_id, + query_string, + bus, storage_manager, query_processor, - parser_wrapper.get(), - foreman); + parser_wrapper.get()); DCHECK(num_distinct_values.getTypeID() == TypeID::kLong); mutable_relation->getStatisticsMutable()->setNumDistinctValues( @@ -291,11 +299,13 @@ void executeAnalyze(QueryProcessor *query_processor, query_string.append(";"); TypedValue num_tuples = - executeQueryForSingleResult(query_string, + executeQueryForSingleResult(main_thread_client_id, + foreman_client_id, + query_string, + bus, storage_manager, query_processor, - parser_wrapper.get(), - foreman); + parser_wrapper.get()); DCHECK(num_tuples.getTypeID() == TypeID::kLong); mutable_relation->getStatisticsMutable()->setNumTuples( @@ -312,9 +322,11 @@ void executeAnalyze(QueryProcessor *query_processor, void executeCommand(const ParseStatement &statement, const CatalogDatabase &catalog_database, + const tmb::client_id main_thread_client_id, + const tmb::client_id foreman_client_id, + MessageBus *bus, StorageManager *storage_manager, QueryProcessor *query_processor, - Foreman *foreman, FILE *out) { const ParseCommand &command = static_cast<const ParseCommand &>(statement); const PtrVector<ParseString> *arguments = command.arguments(); @@ -328,7 +340,8 @@ void executeCommand(const ParseStatement &statement, executeDescribeTable(arguments, catalog_database, out); } } else if (command_str == C::kAnalyzeCommand) { - executeAnalyze(query_processor, foreman, out); + executeAnalyze( + main_thread_client_id, foreman_client_id, bus, query_processor, out); } else { THROW_SQL_ERROR_AT(command.command()) << "Invalid Command"; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/cli/CommandExecutor.hpp ---------------------------------------------------------------------- diff --git a/cli/CommandExecutor.hpp b/cli/CommandExecutor.hpp index c819981..3435aeb 100644 --- a/cli/CommandExecutor.hpp +++ b/cli/CommandExecutor.hpp @@ -21,10 +21,14 @@ #include <cstdio> #include <string> +#include "tmb/id_typedefs.h" + using std::fprintf; using std::fputc; using std::string; +namespace tmb { class MessageBus; } + namespace quickstep { class CatalogDatabase; @@ -53,6 +57,9 @@ constexpr char kAnalyzeCommand[] = "\\analyze"; * * @param statement The parsed statement from the cli. * @param catalog_database The catalog information about the current database. + * @param main_thread_client_id The TMB client ID of the main thread. + * @param foreman_client_id The TMB client ID of the Foreman thread. + * @param bus A pointer to the TMB. * @param storage_manager The current StorageManager. * @param query_processor The query processor to generate plans for SQL queries. * @param foreman The foreman to execute query plans. @@ -60,9 +67,11 @@ constexpr char kAnalyzeCommand[] = "\\analyze"; */ void executeCommand(const ParseStatement &statement, const CatalogDatabase &catalog_database, + const tmb::client_id main_thread_client_id, + const tmb::client_id foreman_client_id, + tmb::MessageBus *bus, StorageManager *storage_manager, QueryProcessor *query_processor, - Foreman *foreman, FILE *out); /** @} */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/cli/QuickstepCli.cpp ---------------------------------------------------------------------- diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp index 558d6eb..fbe7e3b 100644 --- a/cli/QuickstepCli.cpp +++ b/cli/QuickstepCli.cpp @@ -53,8 +53,10 @@ typedef quickstep::LineReaderDumb LineReaderImpl; #include "cli/PrintToScreen.hpp" #include "parser/ParseStatement.hpp" #include "parser/SqlParserWrapper.hpp" +#include "query_execution/AdmitRequestMessage.hpp" #include "query_execution/Foreman.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" #include "query_execution/Worker.hpp" #include "query_execution/WorkerDirectory.hpp" #include "query_execution/WorkerMessage.hpp" @@ -95,6 +97,7 @@ using std::string; using std::vector; using quickstep::Address; +using quickstep::AdmitRequestMessage; using quickstep::CatalogRelation; using quickstep::DefaultsConfigurator; using quickstep::DropRelation; @@ -107,6 +110,7 @@ using quickstep::ParseResult; using quickstep::ParseStatement; using quickstep::PrintToScreen; using quickstep::PtrVector; +using quickstep::QueryExecutionUtil; using quickstep::QueryHandle; using quickstep::QueryPlan; using quickstep::QueryProcessor; @@ -115,9 +119,12 @@ using quickstep::TaggedMessage; using quickstep::Worker; using quickstep::WorkerDirectory; using quickstep::WorkerMessage; +using quickstep::kAdmitRequestMessage; using quickstep::kPoisonMessage; +using quickstep::kWorkloadCompletionMessage; using tmb::client_id; +using tmb::AnnotatedMessage; namespace quickstep { @@ -197,7 +204,9 @@ int main(int argc, char* argv[]) { // The TMB client id for the main thread, used to kill workers at the end. const client_id main_thread_client_id = bus.Connect(); + bus.RegisterClientAsSender(main_thread_client_id, kAdmitRequestMessage); bus.RegisterClientAsSender(main_thread_client_id, kPoisonMessage); + bus.RegisterClientAsReceiver(main_thread_client_id, kWorkloadCompletionMessage); // Setup the paths used by StorageManager. string fixed_storage_path(quickstep::FLAGS_storage_path); @@ -283,12 +292,6 @@ int main(int argc, char* argv[]) { std::chrono::duration<double>(preload_end - preload_start).count()); } - Foreman foreman(&bus, - query_processor->getDefaultDatabase(), - query_processor->getStorageManager(), - -1, /* CPU id to bind foreman. -1 is unbound. */ - num_numa_nodes_system); - // Get the NUMA affinities for workers. vector<int> cpu_numa_nodes = InputParserUtil::GetNUMANodesForCPUs(); if (cpu_numa_nodes.empty()) { @@ -323,13 +326,20 @@ int main(int argc, char* argv[]) { worker_client_ids, worker_numa_nodes); - foreman.setWorkerDirectory(&worker_directory); + Foreman foreman(main_thread_client_id, + &worker_directory, + &bus, + query_processor->getDefaultDatabase(), + query_processor->getStorageManager(), + num_numa_nodes_system); // Start the worker threads. for (Worker &worker : workers) { worker.start(); } + foreman.start(); + LineReaderImpl line_reader("quickstep> ", " ...> "); std::unique_ptr<SqlParserWrapper> parser_wrapper(new SqlParserWrapper()); @@ -366,9 +376,11 @@ int main(int argc, char* argv[]) { quickstep::cli::executeCommand( *result.parsed_statement, *(query_processor->getDefaultDatabase()), + main_thread_client_id, + foreman.getBusClientID(), + &bus, query_processor->getStorageManager(), query_processor.get(), - &foreman, stdout); } catch (const quickstep::SqlError &sql_error) { fprintf(stderr, "%s", @@ -389,14 +401,18 @@ int main(int argc, char* argv[]) { } DCHECK(query_handle->getQueryPlanMutable() != nullptr); - foreman.setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable()); - - foreman.reconstructQueryContextFromProto(query_handle->getQueryContextProto()); + start = std::chrono::steady_clock::now(); + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + main_thread_client_id, + foreman.getBusClientID(), + query_handle.get(), + &bus); try { - start = std::chrono::steady_clock::now(); - foreman.start(); - foreman.join(); + const AnnotatedMessage annotated_msg = + bus.Receive(main_thread_client_id, 0, true); + const TaggedMessage &tagged_message = annotated_msg.tagged_message; + DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type()); end = std::chrono::steady_clock::now(); const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation(); @@ -440,29 +456,13 @@ int main(int argc, char* argv[]) { } } - // Terminate all workers before exiting. - // The main thread broadcasts poison message to the workers. Each worker dies - // after receiving poison message. The order of workers' death is irrelavant. - MessageStyle style; - style.Broadcast(true); - Address address; - address.All(true); - std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage()); - TaggedMessage poison_tagged_message(poison_message.get(), - sizeof(*poison_message), - kPoisonMessage); - - const tmb::MessageBus::SendStatus send_status = - bus.Send(main_thread_client_id, - address, - style, - std::move(poison_tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << - "Broadcast message from Foreman to workers failed"; + // Kill the foreman and workers. + QueryExecutionUtil::BroadcastPoisonMessage(main_thread_client_id, &bus); for (Worker &worker : workers) { worker.join(); } + foreman.join(); return 0; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/cli/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt index ca37e4a..d177d6c 100644 --- a/cli/tests/CMakeLists.txt +++ b/cli/tests/CMakeLists.txt @@ -32,9 +32,11 @@ target_link_libraries(quickstep_cli_tests_CommandExecutorTest quickstep_cli_PrintToScreen quickstep_parser_ParseStatement quickstep_parser_SqlParserWrapper + quickstep_queryexecution_AdmitRequestMessage quickstep_queryexecution_Foreman quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_Worker quickstep_queryexecution_WorkerDirectory quickstep_queryexecution_WorkerMessage http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/cli/tests/CommandExecutorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp index 9cd493e..794f7e1 100644 --- a/cli/tests/CommandExecutorTestRunner.cpp +++ b/cli/tests/CommandExecutorTestRunner.cpp @@ -20,12 +20,15 @@ #include <cstdio> #include <set> #include <string> +#include <utility> #include "cli/CommandExecutor.hpp" #include "cli/DropRelation.hpp" #include "cli/PrintToScreen.hpp" #include "parser/ParseStatement.hpp" +#include "query_execution/AdmitRequestMessage.hpp" #include "query_execution/Foreman.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/Worker.hpp" #include "query_optimizer/ExecutionGenerator.hpp" #include "query_optimizer/LogicalGenerator.hpp" @@ -41,6 +44,8 @@ #include "glog/logging.h" +#include "tmb/tagged_message.h" + namespace quickstep { class CatalogRelation; @@ -87,9 +92,11 @@ void CommandExecutorTestRunner::runTestCase( quickstep::cli::executeCommand( *result.parsed_statement, *(test_database_loader_.catalog_database()), + main_thread_client_id_, + foreman_->getBusClientID(), + &bus_, test_database_loader_.storage_manager(), nullptr, - nullptr, output_stream.file()); } else { QueryHandle query_handle(optimizer_context.query_id()); @@ -100,14 +107,20 @@ void CommandExecutorTestRunner::runTestCase( physical_generator.generatePlan( logical_generator.generatePlan(*result.parsed_statement)); execution_generator.generatePlan(physical_plan); - foreman_->setQueryPlan( - query_handle.getQueryPlanMutable()->getQueryPlanDAGMutable()); - - foreman_->reconstructQueryContextFromProto(query_handle.getQueryContextProto()); - - foreman_->start(); - foreman_->join(); + AdmitRequestMessage request_message(&query_handle); + TaggedMessage admit_tagged_message( + &request_message, sizeof(request_message), kAdmitRequestMessage); + QueryExecutionUtil::SendTMBMessage(&bus_, + main_thread_client_id_, + foreman_->getBusClientID(), + std::move(admit_tagged_message)); + + // Receive workload completion message from Foreman. + const AnnotatedMessage annotated_msg = + bus_.Receive(main_thread_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_msg.tagged_message; + DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type()); const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation(); if (query_result_relation) { PrintToScreen::PrintRelation(*query_result_relation, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/cli/tests/CommandExecutorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/cli/tests/CommandExecutorTestRunner.hpp b/cli/tests/CommandExecutorTestRunner.hpp index 94b1d6a..8fb5b65 100644 --- a/cli/tests/CommandExecutorTestRunner.hpp +++ b/cli/tests/CommandExecutorTestRunner.hpp @@ -27,6 +27,7 @@ #include "parser/SqlParserWrapper.hpp" #include "query_execution/Foreman.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" #include "query_execution/Worker.hpp" #include "query_execution/WorkerDirectory.hpp" #include "query_execution/WorkerMessage.hpp" @@ -34,6 +35,9 @@ #include "utility/Macros.hpp" #include "utility/textbased_test/TextBasedTestDriver.hpp" +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" + namespace quickstep { /** @@ -57,9 +61,11 @@ class CommandExecutorTestRunner : public TextBasedTestRunner { bus_.Initialize(); - foreman_.reset(new Foreman(&bus_, - test_database_loader_.catalog_database(), - test_database_loader_.storage_manager())); + main_thread_client_id_ = bus_.Connect(); + bus_.RegisterClientAsSender(main_thread_client_id_, kAdmitRequestMessage); + bus_.RegisterClientAsSender(main_thread_client_id_, kPoisonMessage); + bus_.RegisterClientAsReceiver(main_thread_client_id_, kWorkloadCompletionMessage); + worker_.reset(new Worker(0, &bus_)); std::vector<client_id> worker_client_ids; @@ -71,27 +77,20 @@ class CommandExecutorTestRunner : public TextBasedTestRunner { workers_.reset(new WorkerDirectory(1 /* number of workers */, worker_client_ids, numa_nodes)); - foreman_->setWorkerDirectory(workers_.get()); + foreman_.reset(new Foreman(main_thread_client_id_, + workers_.get(), + &bus_, + test_database_loader_.catalog_database(), + test_database_loader_.storage_manager())); + foreman_->start(); worker_->start(); } ~CommandExecutorTestRunner() { - std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage()); - TaggedMessage poison_tagged_message(poison_message.get(), - sizeof(*poison_message), - quickstep::kPoisonMessage); - - Address worker_address; - MessageStyle single_receiver_style; - - worker_address.AddRecipient(worker_->getBusClientID()); - bus_.Send(foreman_->getBusClientID(), - worker_address, - single_receiver_style, - std::move(poison_tagged_message)); - + QueryExecutionUtil::BroadcastPoisonMessage(main_thread_client_id_, &bus_); worker_->join(); + foreman_->join(); } void runTestCase(const std::string &input, @@ -102,6 +101,8 @@ class CommandExecutorTestRunner : public TextBasedTestRunner { SqlParserWrapper sql_parser_; optimizer::TestDatabaseLoader test_database_loader_; + tmb::client_id main_thread_client_id_; + MessageBusImpl bus_; std::unique_ptr<Foreman> foreman_; std::unique_ptr<Worker> worker_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/AdmitRequestMessage.hpp ---------------------------------------------------------------------- diff --git a/query_execution/AdmitRequestMessage.hpp b/query_execution/AdmitRequestMessage.hpp new file mode 100644 index 0000000..e33b354 --- /dev/null +++ b/query_execution/AdmitRequestMessage.hpp @@ -0,0 +1,73 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_ + +#include <vector> + +#include "utility/Macros.hpp" + +namespace quickstep { + +class QueryHandle; + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief A message requesting a query or queries to be admitted to the system. + **/ +class AdmitRequestMessage { + public: + /** + * @brief Constructor. + * + * @param query_handles The handles of the queries requesting to be admitted + * to the system. + **/ + explicit AdmitRequestMessage(const std::vector<QueryHandle*> &query_handles) + : query_handles_(query_handles) {} + + /** + * @brief Constructor for requesting single query admission. + * + * @param query_handle The handle of the query requesting to be admitted. + **/ + explicit AdmitRequestMessage(QueryHandle *query_handle) { + query_handles_.push_back(query_handle); + } + + /** + * @brief Get the query handles from this message. + **/ + const std::vector<QueryHandle*>& getQueryHandles() const { + return query_handles_; + } + + private: + std::vector<QueryHandle*> query_handles_; + + DISALLOW_COPY_AND_ASSIGN(AdmitRequestMessage); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 95bc0d6..323e4a9 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -29,8 +29,10 @@ endif() if (ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp) endif() +add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp) add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp) add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp) +add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp) add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp) add_library(quickstep_queryexecution_QueryContext_proto ${queryexecution_QueryContext_proto_srcs} @@ -50,6 +52,8 @@ add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessag add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp) # Link dependencies: +target_link_libraries(quickstep_queryexecution_AdmitRequestMessage + quickstep_utility_Macros) if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_BlockLocator glog @@ -64,29 +68,17 @@ if (ENABLE_DISTRIBUTED) tmb) endif() target_link_libraries(quickstep_queryexecution_Foreman + ${GFLAGS_LIB_NAME} glog - gtest - quickstep_catalog_CatalogDatabase - quickstep_catalog_CatalogRelation - quickstep_catalog_CatalogTypedefs - quickstep_catalog_PartitionScheme + quickstep_queryexecution_AdmitRequestMessage quickstep_queryexecution_ForemanLite - quickstep_queryexecution_QueryContext - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionState + quickstep_queryexecution_PolicyEnforcer quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil - quickstep_queryexecution_WorkOrdersContainer quickstep_queryexecution_WorkerDirectory quickstep_queryexecution_WorkerMessage - quickstep_relationaloperators_RebuildWorkOrder - quickstep_relationaloperators_RelationalOperator - quickstep_relationaloperators_WorkOrder - quickstep_storage_InsertDestination - quickstep_storage_StorageBlock - quickstep_storage_StorageBlockInfo quickstep_threading_ThreadUtil - quickstep_utility_DAG + quickstep_utility_EqualsAnyConstant quickstep_utility_Macros tmb) target_link_libraries(quickstep_queryexecution_ForemanLite @@ -94,6 +86,18 @@ target_link_libraries(quickstep_queryexecution_ForemanLite quickstep_threading_Thread quickstep_utility_Macros tmb) +target_link_libraries(quickstep_queryexecution_PolicyEnforcer + ${GFLAGS_LIB_NAME} + glog + quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryManager + quickstep_queryexecution_WorkerMessage + quickstep_queryoptimizer_QueryHandle + quickstep_relationaloperators_WorkOrder + quickstep_utility_Macros + tmb) target_link_libraries(quickstep_queryexecution_QueryContext glog quickstep_catalog_CatalogDatabaseLite @@ -135,7 +139,9 @@ target_link_libraries(quickstep_queryexecution_QueryExecutionTypedefs quickstep_threading_ThreadIDBasedMap tmb) target_link_libraries(quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_AdmitRequestMessage quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_WorkerMessage quickstep_utility_Macros tmb) target_link_libraries(quickstep_queryexecution_QueryManager @@ -189,8 +195,10 @@ target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy # Module all-in-one library: add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp) target_link_libraries(quickstep_queryexecution + quickstep_queryexecution_AdmitRequestMessage quickstep_queryexecution_Foreman quickstep_queryexecution_ForemanLite + quickstep_queryexecution_PolicyEnforcer quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryContext_proto quickstep_queryexecution_QueryExecutionMessages_proto @@ -235,36 +243,6 @@ if (ENABLE_DISTRIBUTED) add_test(BlockLocator_unittest BlockLocator_unittest) endif() -add_executable(Foreman_unittest - "${CMAKE_CURRENT_SOURCE_DIR}/tests/Foreman_unittest.cpp") -target_link_libraries(Foreman_unittest - glog - gtest - gtest_main - quickstep_catalog_CatalogDatabase - quickstep_catalog_CatalogRelation - quickstep_catalog_CatalogTypedefs - quickstep_queryexecution_Foreman - quickstep_queryexecution_QueryContext - quickstep_queryexecution_QueryContext_proto - quickstep_queryexecution_QueryExecutionState - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_WorkOrdersContainer - quickstep_queryexecution_WorkerDirectory - quickstep_queryexecution_WorkerMessage - quickstep_queryoptimizer_QueryPlan - quickstep_relationaloperators_RelationalOperator - quickstep_relationaloperators_WorkOrder - quickstep_storage_InsertDestination - quickstep_storage_InsertDestination_proto - quickstep_storage_StorageBlock - quickstep_storage_StorageBlockInfo - quickstep_storage_StorageManager - quickstep_utility_DAG - quickstep_utility_Macros - tmb) -add_test(Foreman_unittest Foreman_unittest) - add_executable(QueryManager_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp") target_link_libraries(QueryManager_unittest http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/Foreman.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp index 7705819..0577e20 100644 --- a/query_execution/Foreman.cpp +++ b/query_execution/Foreman.cpp @@ -22,355 +22,189 @@ #include <utility> #include <vector> -#include "catalog/CatalogDatabase.hpp" -#include "catalog/CatalogRelation.hpp" -#include "catalog/CatalogTypedefs.hpp" -#include "catalog/PartitionScheme.hpp" -#include "query_execution/QueryContext.hpp" -#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/AdmitRequestMessage.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryExecutionUtil.hpp" #include "query_execution/WorkerDirectory.hpp" #include "query_execution/WorkerMessage.hpp" -#include "relational_operators/RebuildWorkOrder.hpp" -#include "relational_operators/RelationalOperator.hpp" -#include "relational_operators/WorkOrder.hpp" -#include "storage/InsertDestination.hpp" -#include "storage/StorageBlock.hpp" -#include "storage/StorageBlockInfo.hpp" #include "threading/ThreadUtil.hpp" +#include "utility/EqualsAnyConstant.hpp" #include "utility/Macros.hpp" +#include "gflags/gflags.h" #include "glog/logging.h" #include "tmb/message_bus.h" #include "tmb/tagged_message.h" using std::move; -using std::pair; using std::size_t; +using std::unique_ptr; using std::vector; namespace quickstep { -void Foreman::initialize() { +DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number " + "of pending work orders for the worker. This information is used " + "by the Foreman to assign work orders to worker threads"); + +Foreman::Foreman(const tmb::client_id main_thread_client_id, + WorkerDirectory *worker_directory, + tmb::MessageBus *bus, + CatalogDatabaseLite *catalog_database, + StorageManager *storage_manager, + const int cpu_id, + const size_t num_numa_nodes) + : ForemanLite(bus, cpu_id), + main_thread_client_id_(main_thread_client_id), + worker_directory_(DCHECK_NOTNULL(worker_directory)), + catalog_database_(DCHECK_NOTNULL(catalog_database)), + storage_manager_(DCHECK_NOTNULL(storage_manager)) { + const std::vector<QueryExecutionMessageType> sender_message_types{ + kPoisonMessage, + kRebuildWorkOrderMessage, + kWorkOrderMessage, + kWorkloadCompletionMessage}; + + for (const auto message_type : sender_message_types) { + bus_->RegisterClientAsSender(foreman_client_id_, message_type); + } + + const std::vector<QueryExecutionMessageType> receiver_message_types{ + kAdmitRequestMessage, + kCatalogRelationNewBlockMessage, + kDataPipelineMessage, + kPoisonMessage, + kRebuildWorkOrderCompleteMessage, + kWorkOrderFeedbackMessage, + kWorkOrdersAvailableMessage, + kWorkOrderCompleteMessage}; + + for (const auto message_type : receiver_message_types) { + bus_->RegisterClientAsReceiver(foreman_client_id_, message_type); + } + + policy_enforcer_.reset(new PolicyEnforcer( + foreman_client_id_, + num_numa_nodes, + catalog_database_, + storage_manager_, + bus_)); +} + +void Foreman::run() { if (cpu_id_ >= 0) { // We can pin the foreman thread to a CPU if specified. ThreadUtil::BindToCPU(cpu_id_); } - initializeState(); - - DEBUG_ASSERT(query_dag_ != nullptr); - const dag_node_index dag_size = query_dag_->size(); - - // Collect all the workorders from all the relational operators in the DAG. - for (dag_node_index index = 0; index < dag_size; ++index) { - if (checkAllBlockingDependenciesMet(index)) { - query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet(); - processOperator(index, false); - } - } - - // Dispatch the WorkOrders generated so far. - dispatchWorkerMessages(0, 0); -} - -void Foreman::processWorkOrderCompleteMessage(const dag_node_index op_index, - const size_t worker_thread_index) { - query_exec_state_->decrementNumQueuedWorkOrders(op_index); - - // As the given worker finished executing a WorkOrder, decrement its number - // of queued WorkOrders. - workers_->decrementNumQueuedWorkOrders(worker_thread_index); - - // Check if new work orders are available and fetch them if so. - fetchNormalWorkOrders(op_index); - - if (checkRebuildRequired(op_index)) { - if (checkNormalExecutionOver(op_index)) { - if (!checkRebuildInitiated(op_index)) { - if (initiateRebuild(op_index)) { - // Rebuild initiated and completed right away. - markOperatorFinished(op_index); - } else { - // Rebuild under progress. - } - } else if (checkRebuildOver(op_index)) { - // Rebuild was under progress and now it is over. - markOperatorFinished(op_index); - } - } else { - // Normal execution under progress for this operator. - } - } else if (checkOperatorExecutionOver(op_index)) { - // Rebuild not required for this operator and its normal execution is - // complete. - markOperatorFinished(op_index); - } - - for (const pair<dag_node_index, bool> &dependent_link : - query_dag_->getDependents(op_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - // Process the dependent operator (of the operator whose WorkOrder - // was just executed) for which all the dependencies have been met. - processOperator(dependent_op_index, true); - } - } - - // Dispatch the WorkerMessages to the workers. We prefer to start the search - // for the schedulable WorkOrders beginning from 'op_index'. The first - // candidate worker to receive the next WorkOrder is the one that sent the - // response message to Foreman. - dispatchWorkerMessages(worker_thread_index, op_index); -} - -void Foreman::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index, - const size_t worker_thread_index) { - query_exec_state_->decrementNumRebuildWorkOrders(op_index); - workers_->decrementNumQueuedWorkOrders(worker_thread_index); - - if (checkRebuildOver(op_index)) { - markOperatorFinished(op_index); - - for (const pair<dag_node_index, bool> &dependent_link : - query_dag_->getDependents(op_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - processOperator(dependent_op_index, true); - } - } - } - - // Dispatch the WorkerMessages to the workers. We prefer to start the search - // for the schedulable WorkOrders beginning from 'op_index'. The first - // candidate worker to receive the next WorkOrder is the one that sent the - // response message to Foreman. - dispatchWorkerMessages(worker_thread_index, op_index); -} - -void Foreman::processDataPipelineMessage(const dag_node_index op_index, - const block_id block, - const relation_id rel_id) { - for (const dag_node_index consumer_index : - output_consumers_[op_index]) { - // Feed the streamed block to the consumer. Note that 'output_consumers_' - // only contain those dependents of operator with index = op_index which are - // eligible to receive streamed input. - query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id); - // Because of the streamed input just fed, check if there are any new - // WorkOrders available and if so, fetch them. - fetchNormalWorkOrders(consumer_index); - } - - // Dispatch the WorkerMessages to the workers. We prefer to start the search - // for the schedulable WorkOrders beginning from 'op_index'. The first - // candidate worker to receive the next WorkOrder is the one that sent the - // response message to Foreman. - // TODO(zuyu): Improve the data locality for the next WorkOrder. - dispatchWorkerMessages(0, op_index); -} - -void Foreman::processFeedbackMessage(const WorkOrder::FeedbackMessage &msg) { - RelationalOperator *op = - query_dag_->getNodePayloadMutable(msg.header().rel_op_index); - op->receiveFeedbackMessage(msg); -} - -void Foreman::run() { - // Initialize before for Foreman eventloop. - initialize(); // Event loop - while (!query_exec_state_->hasQueryExecutionFinished()) { + for (;;) { // Receive() causes this thread to sleep until next message is received. - AnnotatedMessage annotated_msg = bus_->Receive(foreman_client_id_, 0, true); + const AnnotatedMessage annotated_msg = + bus_->Receive(foreman_client_id_, 0, true); const TaggedMessage &tagged_message = annotated_msg.tagged_message; - switch (tagged_message.message_type()) { - case kWorkOrderCompleteMessage: { - serialization::WorkOrderCompletionMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index()); - break; - } - case kRebuildWorkOrderCompleteMessage: { - serialization::WorkOrderCompletionMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processRebuildWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index()); + const tmb::message_type_id message_type = tagged_message.message_type(); + switch (message_type) { + case kCatalogRelationNewBlockMessage: // Fall through + case kDataPipelineMessage: + case kRebuildWorkOrderCompleteMessage: + case kWorkOrderCompleteMessage: + case kWorkOrderFeedbackMessage: + case kWorkOrdersAvailableMessage: { + policy_enforcer_->processMessage(tagged_message); break; } - case kCatalogRelationNewBlockMessage: { - serialization::CatalogRelationNewBlockMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - const block_id block = proto.block_id(); - - CatalogRelation *relation = - static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id()); - relation->addBlock(block); - - if (proto.has_partition_id()) { - relation->getPartitionSchemeMutable()->addBlockToPartition(proto.partition_id(), block); + case kAdmitRequestMessage: { + const AdmitRequestMessage *msg = + static_cast<const AdmitRequestMessage *>(tagged_message.message()); + const vector<QueryHandle *> &query_handles = msg->getQueryHandles(); + + DCHECK(!query_handles.empty()); + bool all_queries_admitted = true; + if (query_handles.size() == 1u) { + all_queries_admitted = + policy_enforcer_->admitQuery(query_handles.front()); + } else { + all_queries_admitted = policy_enforcer_->admitQueries(query_handles); + } + if (!all_queries_admitted) { + LOG(WARNING) << "The scheduler could not admit all the queries"; + // TODO(harshad) - Inform the main thread about the failure. } break; } - case kDataPipelineMessage: { - // Possible message senders include InsertDestinations and some - // operators which modify existing blocks. - serialization::DataPipelineMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processDataPipelineMessage(proto.operator_index(), proto.block_id(), proto.relation_id()); - break; - } - case kWorkOrdersAvailableMessage: { - serialization::WorkOrdersAvailableMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - const dag_node_index op_index = proto.operator_index(); - - // Check if new work orders are available. - fetchNormalWorkOrders(op_index); - - // Dispatch the WorkerMessages to the workers. We prefer to start the search - // for the schedulable WorkOrders beginning from 'op_index'. The first - // candidate worker to receive the next WorkOrder is the one that sent the - // response message to Foreman. - // TODO(zuyu): Improve the data locality for the next WorkOrder. - dispatchWorkerMessages(0, op_index); - break; - } - case kWorkOrderFeedbackMessage: { - WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()), - tagged_message.message_bytes()); - processFeedbackMessage(msg); - break; + case kPoisonMessage: { + if (policy_enforcer_->hasQueries()) { + LOG(WARNING) << "Foreman thread exiting while some queries are " + "under execution or waiting to be admitted"; + } + return; } default: LOG(FATAL) << "Unknown message type to Foreman"; } - } - - // Clean up before exiting. - cleanUp(); -} -void Foreman::dispatchWorkerMessages( - const size_t start_worker_index, - const dag_node_index start_operator_index) { - // Loop over all workers. Stopping criteria: - // 1. Every worker has been assigned exactly max_msgs_per_worker_ workorders. - // OR 2. No schedulable workorders at this time. - size_t done_workers_count = 0; - for (size_t curr_worker = start_worker_index; - done_workers_count < workers_->getNumWorkers(); - curr_worker = (curr_worker + 1) % workers_->getNumWorkers()) { - if (workers_->getNumQueuedWorkOrders(curr_worker) < max_msgs_per_worker_) { - std::unique_ptr<WorkerMessage> msg; - msg.reset(getNextWorkerMessage( - start_operator_index, workers_->getNUMANode(curr_worker))); - if (msg.get() != nullptr) { - sendWorkerMessage(curr_worker, *msg); - workers_->incrementNumQueuedWorkOrders(curr_worker); - } else { - // No schedulable workorder at this point. - ++done_workers_count; - } - } else { - // curr_worker already has been assigned max_msgs_per_worker workorders. - ++done_workers_count; + if (canCollectNewMessages(message_type)) { + vector<unique_ptr<WorkerMessage>> new_messages; + policy_enforcer_->getWorkerMessages(&new_messages); + dispatchWorkerMessages(new_messages); + } + + // We check again, as some queries may produce zero work orders and finish + // their execution. + if (!policy_enforcer_->hasQueries()) { + // Signal the main thread that there are no queries to be executed. + // Currently the message doesn't have any real content. + const int dummy_payload = 0; + TaggedMessage completion_tagged_message( + &dummy_payload, sizeof(dummy_payload), kWorkloadCompletionMessage); + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage( + bus_, + foreman_client_id_, + main_thread_client_id_, + move(completion_tagged_message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " + << foreman_client_id_ << " to main thread with TMB client ID" + << main_thread_client_id_; } } } -void Foreman::initializeState() { - const dag_node_index dag_size = query_dag_->size(); - - output_consumers_.resize(dag_size); - blocking_dependencies_.resize(dag_size); - - query_exec_state_.reset(new QueryExecutionState(dag_size)); - workorders_container_.reset(new WorkOrdersContainer(dag_size, num_numa_nodes_)); - - for (dag_node_index node_index = 0; node_index < dag_size; ++node_index) { - const QueryContext::insert_destination_id insert_destination_index = - query_dag_->getNodePayload(node_index).getInsertDestinationID(); - if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) { - // Rebuild is necessary whenever InsertDestination is present. - query_exec_state_->setRebuildRequired(node_index); - query_exec_state_->setRebuildStatus(node_index, 0, false); - } - - for (const pair<dag_node_index, bool> &dependent_link : - query_dag_->getDependents(node_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) { - // The link is not a pipeline-breaker. Streaming of blocks is possible - // between these two operators. - output_consumers_[node_index].push_back(dependent_op_index); - } else { - // The link is a pipeline-breaker. Streaming of blocks is not possible - // between these two operators. - blocking_dependencies_[dependent_op_index].push_back(node_index); - } - } +bool Foreman::canCollectNewMessages(const tmb::message_type_id message_type) { + if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, + kCatalogRelationNewBlockMessage, + kWorkOrderFeedbackMessage)) { + return false; + } else if (worker_directory_->getLeastLoadedWorker().second <= + FLAGS_min_load_per_worker) { + // If the least loaded worker has only one pending work order, we should + // collect new messages and dispatch them. + return true; + } else { + return false; } } -// TODO(harshad) : The default policy may execute remote WorkOrders for an -// operator with a lower index even when there are local WorkOrders available for -// an operator with higher index. We should examine if avoiding this behavior -// has any benefits with respect to execution time and/or memory pressure. -WorkerMessage* Foreman::getNextWorkerMessage( - const dag_node_index start_operator_index, const int numa_node) { - // Default policy: Operator with lowest index first. - WorkOrder *work_order = nullptr; - size_t num_operators_checked = 0; - for (dag_node_index index = start_operator_index; - num_operators_checked < query_dag_->size(); - index = (index + 1) % query_dag_->size(), ++num_operators_checked) { - if (query_exec_state_->hasExecutionFinished(index)) { - continue; - } - if (numa_node != -1) { - // First try to get a normal WorkOrder from the specified NUMA node. - work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node); - if (work_order != nullptr) { - // A WorkOrder found on the given NUMA node. - query_exec_state_->incrementNumQueuedWorkOrders(index); - return WorkerMessage::WorkOrderMessage(work_order, index); - } else { - // Normal workorder not found on this node. Look for a rebuild workorder - // on this NUMA node. - work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node); - if (work_order != nullptr) { - return WorkerMessage::RebuildWorkOrderMessage(work_order, index); - } - } - } - // Either no workorder found on the given NUMA node, or numa_node is -1. - // Try to get a normal WorkOrder from other NUMA nodes. - work_order = workorders_container_->getNormalWorkOrder(index); - if (work_order != nullptr) { - query_exec_state_->incrementNumQueuedWorkOrders(index); - return WorkerMessage::WorkOrderMessage(work_order, index); +void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) { + for (const auto &message : messages) { + DCHECK(message != nullptr); + const int recipient_worker_thread_index = message->getRecipientHint(); + if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) { + sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index), + *message); } else { - // Normal WorkOrder not found, look for a RebuildWorkOrder. - work_order = workorders_container_->getRebuildWorkOrder(index); - if (work_order != nullptr) { - return WorkerMessage::RebuildWorkOrderMessage(work_order, index); - } + sendWorkerMessage(worker_directory_->getLeastLoadedWorker().first, + *message); } } - // No WorkOrders available right now. - return nullptr; } -void Foreman::sendWorkerMessage(const std::size_t worker_thread_index, +void Foreman::sendWorkerMessage(const size_t worker_thread_index, const WorkerMessage &message) { - message_type_id type; + tmb::message_type_id type; if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) { type = kRebuildWorkOrderMessage; } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) { @@ -383,152 +217,12 @@ void Foreman::sendWorkerMessage(const std::size_t worker_thread_index, const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, - workers_->getClientID(worker_thread_index), + worker_directory_->getClientID(worker_thread_index), move(worker_tagged_message)); CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ << " to Foreman with TMB client ID " - << workers_->getClientID(worker_thread_index); -} - -bool Foreman::fetchNormalWorkOrders(const dag_node_index index) { - bool generated_new_workorders = false; - if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) { - // Do not fetch any work units until all blocking dependencies are met. - // The releational operator is not aware of blocking dependencies for - // uncorrelated scalar queries. - if (!checkAllBlockingDependenciesMet(index)) { - return false; - } - const size_t num_pending_workorders_before = - workorders_container_->getNumNormalWorkOrders(index); - const bool done_generation = - query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(), - query_context_.get(), - storage_manager_, - foreman_client_id_, - bus_); - if (done_generation) { - query_exec_state_->setDoneGenerationWorkOrders(index); - } - - // TODO(shoban): It would be a good check to see if operator is making - // useful progress, i.e., the operator either generates work orders to - // execute or still has pending work orders executing. However, this will not - // work if Foreman polls operators without feeding data. This check can be - // enabled, if Foreman is refactored to call getAllWorkOrders() only when - // pending work orders are completed or new input blocks feed. - - generated_new_workorders = - (num_pending_workorders_before < - workorders_container_->getNumNormalWorkOrders(index)); - } - return generated_new_workorders; -} - -void Foreman::processOperator(const dag_node_index index, - const bool recursively_check_dependents) { - if (fetchNormalWorkOrders(index)) { - // Fetched work orders. Return to wait for the generated work orders to - // execute, and skip the execution-finished checks. - return; - } - - if (checkNormalExecutionOver(index)) { - if (checkRebuildRequired(index)) { - if (!checkRebuildInitiated(index)) { - // Rebuild hasn't started, initiate it. - if (initiateRebuild(index)) { - // Rebuild initiated and completed right away. - markOperatorFinished(index); - } else { - // Rebuild WorkOrders have been generated. - return; - } - } else if (checkRebuildOver(index)) { - // Rebuild had been initiated and it is over. - markOperatorFinished(index); - } - } else { - // Rebuild is not required and normal execution over, mark finished. - markOperatorFinished(index); - } - // If we reach here, that means the operator has been marked as finished. - if (recursively_check_dependents) { - for (const pair<dag_node_index, bool> &dependent_link : - query_dag_->getDependents(index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - processOperator(dependent_op_index, true); - } - } - } - } -} - -void Foreman::markOperatorFinished(const dag_node_index index) { - query_exec_state_->setExecutionFinished(index); - - RelationalOperator *op = query_dag_->getNodePayloadMutable(index); - op->updateCatalogOnCompletion(); - - const relation_id output_rel = op->getOutputRelationID(); - for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) { - const dag_node_index dependent_op_index = dependent_link.first; - RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index); - // Signal dependent operator that current operator is done feeding input blocks. - if (output_rel >= 0) { - dependent_op->doneFeedingInputBlocks(output_rel); - } - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - dependent_op->informAllBlockingDependenciesMet(); - } - } -} - -bool Foreman::initiateRebuild(const dag_node_index index) { - DEBUG_ASSERT(!workorders_container_->hasRebuildWorkOrder(index)); - DEBUG_ASSERT(checkRebuildRequired(index)); - DEBUG_ASSERT(!checkRebuildInitiated(index)); - - getRebuildWorkOrders(index, workorders_container_.get()); - - query_exec_state_->setRebuildStatus( - index, workorders_container_->getNumRebuildWorkOrders(index), true); - - return (query_exec_state_->getNumRebuildWorkOrders(index) == 0); -} - -void Foreman::getRebuildWorkOrders(const dag_node_index index, WorkOrdersContainer *container) { - const RelationalOperator &op = query_dag_->getNodePayload(index); - const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID(); - - if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) { - return; - } - - vector<MutableBlockReference> partially_filled_block_refs; - - DCHECK(query_context_ != nullptr); - InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index); - DCHECK(insert_destination != nullptr); - - insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs); - - for (vector<MutableBlockReference>::size_type i = 0; - i < partially_filled_block_refs.size(); - ++i) { - // Note: The query ID used below is dummy for now, it will be replaced with - // the true query ID when QueryManager gets used in Foreman. - container->addRebuildWorkOrder( - new RebuildWorkOrder(0, - move(partially_filled_block_refs[i]), - index, - op.getOutputRelationID(), - foreman_client_id_, - bus_), - index); - } + << worker_directory_->getClientID(worker_thread_index); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_execution/Foreman.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp index 2d6e0d3..94cb9fc 100644 --- a/query_execution/Foreman.hpp +++ b/query_execution/Foreman.hpp @@ -22,22 +22,11 @@ #include <memory> #include <vector> -#include "catalog/CatalogTypedefs.hpp" #include "query_execution/ForemanLite.hpp" -#include "query_execution/QueryContext.hpp" -#include "query_execution/QueryExecutionState.hpp" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/WorkOrdersContainer.hpp" -#include "query_execution/WorkerMessage.hpp" -#include "relational_operators/RelationalOperator.hpp" -#include "relational_operators/WorkOrder.hpp" -#include "storage/StorageBlockInfo.hpp" -#include "utility/DAG.hpp" +#include "query_execution/PolicyEnforcer.hpp" #include "utility/Macros.hpp" -#include "glog/logging.h" -#include "gtest/gtest_prod.h" - +#include "tmb/id_typedefs.h" #include "tmb/message_bus.h" namespace quickstep { @@ -45,23 +34,24 @@ namespace quickstep { class CatalogDatabaseLite; class StorageManager; class WorkerDirectory; - -namespace serialization { class QueryContext; } +class WorkerMessage; /** \addtogroup QueryExecution * @{ */ /** - * @brief The Foreman scans the query DAG, requests each operator to produce - * workorders. It also pipelines the intermediate output it receives to - * the relational operators which need it. + * @brief The Foreman receives queries from the main thread, messages from the + * policy enforcer and dispatches the work to worker threads. It also + * receives work completion messages from workers. **/ class Foreman final : public ForemanLite { public: /** * @brief Constructor. * + * @param main_thread_client_id The TMB client ID of the main thread. + * @param worker_directory The worker directory. * @param bus A pointer to the TMB. * @param catalog_database The catalog database where this query is executed. * @param storage_manager The StorageManager to use. @@ -71,233 +61,28 @@ class Foreman final : public ForemanLite { * @note If cpu_id is not specified, Foreman thread can be possibly moved * around on different CPUs by the OS. **/ - Foreman(tmb::MessageBus *bus, + Foreman(const tmb::client_id main_thread_client_id, + WorkerDirectory *worker_directory, + tmb::MessageBus *bus, CatalogDatabaseLite *catalog_database, StorageManager *storage_manager, const int cpu_id = -1, - const int num_numa_nodes = 1) - : ForemanLite(bus, cpu_id), - catalog_database_(DCHECK_NOTNULL(catalog_database)), - storage_manager_(DCHECK_NOTNULL(storage_manager)), - max_msgs_per_worker_(1), - num_numa_nodes_(num_numa_nodes) { - bus_->RegisterClientAsSender(foreman_client_id_, kWorkOrderMessage); - bus_->RegisterClientAsSender(foreman_client_id_, kRebuildWorkOrderMessage); - // NOTE : Foreman thread sends poison messages in the optimizer's - // ExecutionGeneratorTest. - bus_->RegisterClientAsSender(foreman_client_id_, kPoisonMessage); - - bus_->RegisterClientAsReceiver(foreman_client_id_, - kWorkOrderCompleteMessage); - bus_->RegisterClientAsReceiver(foreman_client_id_, - kRebuildWorkOrderCompleteMessage); - bus_->RegisterClientAsReceiver(foreman_client_id_, kCatalogRelationNewBlockMessage); - bus_->RegisterClientAsReceiver(foreman_client_id_, kDataPipelineMessage); - bus_->RegisterClientAsReceiver(foreman_client_id_, - kWorkOrdersAvailableMessage); - bus_->RegisterClientAsReceiver(foreman_client_id_, - kWorkOrderFeedbackMessage); - } + const std::size_t num_numa_nodes = 1); ~Foreman() override {} - /** - * @brief Set the Query plan DAG for the query to be executed. - * - * @param query_plan_dag A pointer to the query plan DAG. - **/ - inline void setQueryPlan(DAG<RelationalOperator, bool> *query_plan_dag) { - query_dag_ = query_plan_dag; - } - - /** - * @brief Reconstruct the QueryContext for the query to be executed. - * - * @param proto The serialized QueryContext. - **/ - inline void reconstructQueryContextFromProto(const serialization::QueryContext &proto) { - query_context_.reset( - new QueryContext(proto, *catalog_database_, storage_manager_, foreman_client_id_, bus_)); - } - - /** - * @brief Set the WorkerDirectory pointer. - * - * @param workers A pointer to the WorkerDirectory. - **/ - void setWorkerDirectory(WorkerDirectory *workers) { - workers_ = workers; - } - - /** - * @brief Set the maximum number of messages that should be allocated to each - * worker during a single round of WorkOrder dispatch. - * - * @param max_msgs_per_worker Maximum number of messages. - **/ - void setMaxMessagesPerWorker(const std::size_t max_msgs_per_worker) { - max_msgs_per_worker_ = max_msgs_per_worker; - } - protected: - /** - * @brief The foreman receives a DAG of relational operators, asks relational - * operators to produce the workorders and based on the response it gets - * pipelines the intermediate output to dependent relational operators. - * - * @note The workers who get the messages from the Foreman execute and - * subsequently delete the WorkOrder contained in the message. - **/ void run() override; private: - typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index; - - /** - * @brief Check if all the dependencies of the node at specified index have - * finished their execution. - * - * @note This function's true return value is a pre-requisite for calling - * getRebuildWorkOrders() - * - * @param node_index The index of the specified node in the query DAG. - * - * @return True if all the dependencies have finished their execution. False - * otherwise. - **/ - inline bool checkAllDependenciesMet(const dag_node_index node_index) const { - for (const dag_node_index dependency_index : query_dag_->getDependencies(node_index)) { - // If at least one of the dependencies is not met, return false. - if (!query_exec_state_->hasExecutionFinished(dependency_index)) { - return false; - } - } - return true; - } - - /** - * @brief Check if all the blocking dependencies of the node at specified - * index have finished their execution. - * - * @note A blocking dependency is the one which is pipeline breaker. Output of - * a dependency can't be streamed to its dependent if the link between - * them is pipeline breaker. - * - * @param node_index The index of the specified node in the query DAG. - * - * @return True if all the blocking dependencies have finished their - * execution. False otherwise. - **/ - inline bool checkAllBlockingDependenciesMet(const dag_node_index node_index) const { - for (const dag_node_index blocking_dependency_index : blocking_dependencies_[node_index]) { - if (!query_exec_state_->hasExecutionFinished(blocking_dependency_index)) { - return false; - } - } - return true; - } - /** * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the * worker threads. * - * @param start_worker_index The dispatch of WorkOrders preferably begins with - * the worker at this index. - * @param start_operator_index The search for a schedulable WorkOrder - * begins with the WorkOrders generated by this operator. - **/ - void dispatchWorkerMessages(const std::size_t start_worker_index, - const dag_node_index start_operator_index); - - /** - * @brief Initialize all the local vectors and maps. If the operator has an - * InsertDestination, pass the bus address and Foreman's TMB client ID - * to it. - **/ - void initializeState(); - - /** - * @brief Initialize the Foreman before starting the event loop. This binds - * the Foreman thread to configured CPU, and does initial processing of - * operator before waiting for events from Workers. - **/ - void initialize(); - - /** - * @brief Process the received WorkOrder complete message. - * - * @param node_index The index of the specified operator node in the query DAG - * for the completed WorkOrder. - * @param worker_thread_index The logical index of the worker thread in - * WorkerDirectory for the completed WorkOrder. - **/ - void processWorkOrderCompleteMessage(const dag_node_index op_index, - const std::size_t worker_thread_index); - - /** - * @brief Process the received RebuildWorkOrder complete message. - * - * @param node_index The index of the specified operator node in the query DAG - * for the completed RebuildWorkOrder. - * @param worker_thread_index The logical index of the worker thread in - * WorkerDirectory for the completed RebuildWorkOrder. - **/ - void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index, - const std::size_t worker_thread_index); - - /** - * @brief Process the received data pipeline message. - * - * @param node_index The index of the specified operator node in the query DAG - * for the pipelining block. - * @param block The block id. - * @param rel_id The ID of the relation that produced 'block'. - **/ - void processDataPipelineMessage(const dag_node_index op_index, - const block_id block, - const relation_id rel_id); - - /** - * @brief Process the received work order feedback message and notify relational - * operator. - * - * @param message Feedback message from work order. - **/ - void processFeedbackMessage(const WorkOrder::FeedbackMessage &message); - - /** - * @brief Clear some of the vectors used for a single run of a query. - **/ - void cleanUp() { - output_consumers_.clear(); - blocking_dependencies_.clear(); - } - - /** - * @brief Process a current relational operator: Get its workorders and store - * them in the WorkOrdersContainer for this query. If the operator can - * be marked as done, do so. - * - * @param index The index of the relational operator to be processed in the - * query plan DAG. - * @param recursively_check_dependents If an operator is done, should we - * call processOperator on its dependents recursively. + * @param messages The messages to be dispatched. **/ - void processOperator(const dag_node_index index, const bool recursively_check_dependents); - - /** - * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage. - * - * @param start_operator_index Begin the search for the schedulable WorkOrder - * with the operator at this index. - * @param numa_node The next WorkOrder should preferably have its input(s) - * from this numa_node. This is a hint and not a binding requirement. - * - * @return A pointer to the WorkerMessage. If there's no WorkOrder to be - * executed, return NULL. - **/ - WorkerMessage* getNextWorkerMessage( - const dag_node_index start_operator_index, const int numa_node = -1); + void dispatchWorkerMessages( + const std::vector<std::unique_ptr<WorkerMessage>> &messages); /** * @brief Send the given message to the specified worker. @@ -306,156 +91,24 @@ class Foreman final : public ForemanLite { * in WorkerDirectory. * @param message The WorkerMessage to be sent. **/ - void sendWorkerMessage(const std::size_t worker_thread_index, const WorkerMessage &message); - - /** - * @brief Fetch all work orders currently available in relational operator and - * store them internally. - * - * @param index The index of the relational operator to be processed in the - * query plan DAG. - * - * @return Whether any work order was generated by op. - **/ - bool fetchNormalWorkOrders(const dag_node_index index); - - /** - * @brief This function does the following things: - * 1. Mark the given relational operator as "done". - * 2. For all the dependents of this operator, check if all of their - * blocking dependencies are met. If so inform them that the blocking - * dependencies are met. - * 3. Check if the given operator is done producing output. If it's - * done, inform the dependents that they won't receive input anymore - * from the given operator. - * - * @param index The index of the given relational operator in the DAG. - **/ - void markOperatorFinished(const dag_node_index index); - - /** - * @brief Check if the execution of the given operator is over. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the execution of the given operator is over, false - * otherwise. - **/ - inline bool checkOperatorExecutionOver(const dag_node_index index) const { - if (checkRebuildRequired(index)) { - return (checkNormalExecutionOver(index) && checkRebuildOver(index)); - } else { - return checkNormalExecutionOver(index); - } - } - - /** - * @brief Check if the given operator's normal execution is over. - * - * @note The conditions for a given operator's normal execution to get over: - * 1. All of its normal (i.e. non rebuild) WorkOrders have finished - * execution. - * 2. The operator is done generating work orders. - * 3. All of the dependencies of the given operator have been met. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the normal execution of the given operator is over, false - * otherwise. - **/ - inline bool checkNormalExecutionOver(const dag_node_index index) const { - return (checkAllDependenciesMet(index) && - !workorders_container_->hasNormalWorkOrder(index) && - query_exec_state_->getNumQueuedWorkOrders(index) == 0 && - query_exec_state_->hasDoneGenerationWorkOrders(index)); - } + void sendWorkerMessage(const std::size_t worker_thread_index, + const WorkerMessage &message); /** - * @brief Check if the rebuild operation is required for a given operator. - * - * @param index The index of the given operator in the DAG. + * @brief Check if we can collect new messages from the PolicyEnforcer. * - * @return True if the rebuild operation is required, false otherwise. + * @param message_type The type of the last received message. **/ - inline bool checkRebuildRequired(const dag_node_index index) const { - return query_exec_state_->isRebuildRequired(index); - } + bool canCollectNewMessages(const tmb::message_type_id message_type); - /** - * @brief Check if the rebuild operation for a given operator is over. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the rebuild operation is over, false otherwise. - **/ - inline bool checkRebuildOver(const dag_node_index index) const { - return query_exec_state_->hasRebuildInitiated(index) && - !workorders_container_->hasRebuildWorkOrder(index) && - (query_exec_state_->getNumRebuildWorkOrders(index) == 0); - } + const tmb::client_id main_thread_client_id_; - /** - * @brief Check if the rebuild operation for a given operator has been - * initiated. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the rebuild operation has been initiated, false otherwise. - **/ - inline bool checkRebuildInitiated(const dag_node_index index) const { - return query_exec_state_->hasRebuildInitiated(index); - } - - /** - * @brief Initiate the rebuild process for partially filled blocks generated - * during the execution of the given operator. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the rebuild is over immediately, i.e. the operator didn't - * generate any rebuild WorkOrders, false otherwise. - **/ - bool initiateRebuild(const dag_node_index index); - - /** - * @brief Get the rebuild WorkOrders for an operator. - * - * @note This function should be called only once, when all the normal - * WorkOrders generated by an operator finish their execution. - * - * @param index The index of the operator in the query plan DAG. - * @param container A pointer to a WorkOrdersContainer to be used to store the - * generated WorkOrders. - **/ - void getRebuildWorkOrders(const dag_node_index index, WorkOrdersContainer *container); + WorkerDirectory *worker_directory_; CatalogDatabaseLite *catalog_database_; StorageManager *storage_manager_; - DAG<RelationalOperator, bool> *query_dag_; - - std::unique_ptr<QueryContext> query_context_; - - // During a single round of WorkOrder dispatch, a Worker should be allocated - // at most these many WorkOrders. - std::size_t max_msgs_per_worker_; - - // For all nodes, store their receiving dependents. - std::vector<std::vector<dag_node_index>> output_consumers_; - - // For all nodes, store their pipeline breaking dependencies (if any). - std::vector<std::vector<dag_node_index>> blocking_dependencies_; - - std::unique_ptr<QueryExecutionState> query_exec_state_; - - std::unique_ptr<WorkOrdersContainer> workorders_container_; - - const int num_numa_nodes_; - - WorkerDirectory *workers_; - - friend class ForemanTest; - FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest); + std::unique_ptr<PolicyEnforcer> policy_enforcer_; DISALLOW_COPY_AND_ASSIGN(Foreman); };