BugFix: Update NumQueuedWorkOrders to fix scheduling
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/49316237 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/49316237 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/49316237 Branch: refs/heads/adaptive-bloom-filters Commit: 49316237f54cfff6e2ea92fe3c4333f35d46190f Parents: dd44958 Author: Navneet Potti <nav...@apache.org> Authored: Tue Jun 14 21:52:25 2016 -0500 Committer: Harshad Deshmukh <hbdeshm...@apache.org> Committed: Wed Jun 15 00:20:34 2016 -0500 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 1 + query_execution/Foreman.cpp | 8 ++++++-- query_execution/PolicyEnforcer.cpp | 2 ++ query_execution/PolicyEnforcer.hpp | 4 ++++ 4 files changed, 13 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 323e4a9..501166e 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -93,6 +93,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryManager + quickstep_queryexecution_WorkerDirectory quickstep_queryexecution_WorkerMessage quickstep_queryoptimizer_QueryHandle quickstep_relationaloperators_WorkOrder http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/Foreman.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp index 0577e20..828834d 100644 --- a/query_execution/Foreman.cpp +++ b/query_execution/Foreman.cpp @@ -89,6 +89,7 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id, num_numa_nodes, catalog_database_, storage_manager_, + worker_directory_, bus_)); } @@ -115,6 +116,7 @@ void Foreman::run() { policy_enforcer_->processMessage(tagged_message); break; } + case kAdmitRequestMessage: { const AdmitRequestMessage *msg = static_cast<const AdmitRequestMessage *>(tagged_message.message()); @@ -195,9 +197,11 @@ void Foreman::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &me if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) { sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index), *message); + worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index); } else { - sendWorkerMessage(worker_directory_->getLeastLoadedWorker().first, - *message); + const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first; + sendWorkerMessage(least_loaded_worker_thread_index, *message); + worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index); } } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/PolicyEnforcer.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp index 2145429..4501026 100644 --- a/query_execution/PolicyEnforcer.cpp +++ b/query_execution/PolicyEnforcer.cpp @@ -27,6 +27,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryManager.hpp" +#include "query_execution/WorkerDirectory.hpp" #include "query_optimizer/QueryHandle.hpp" #include "relational_operators/WorkOrder.hpp" @@ -72,6 +73,7 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) { CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); query_id = proto.query_id(); + worker_directory_->decrementNumQueuedWorkOrders(proto.worker_thread_index()); break; } case kCatalogRelationNewBlockMessage: { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/49316237/query_execution/PolicyEnforcer.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp index 5915b79..9f87056 100644 --- a/query_execution/PolicyEnforcer.hpp +++ b/query_execution/PolicyEnforcer.hpp @@ -40,6 +40,7 @@ namespace quickstep { class CatalogDatabaseLite; class QueryHandle; class StorageManager; +class WorkerDirectory; /** * @brief A class that ensures that a high level policy is maintained @@ -60,11 +61,13 @@ class PolicyEnforcer { const std::size_t num_numa_nodes, CatalogDatabaseLite *catalog_database, StorageManager *storage_manager, + WorkerDirectory *worker_directory, tmb::MessageBus *bus) : foreman_client_id_(foreman_client_id), num_numa_nodes_(num_numa_nodes), catalog_database_(catalog_database), storage_manager_(storage_manager), + worker_directory_(worker_directory), bus_(bus) {} /** @@ -148,6 +151,7 @@ class PolicyEnforcer { CatalogDatabaseLite *catalog_database_; StorageManager *storage_manager_; + WorkerDirectory *worker_directory_; tmb::MessageBus *bus_;