Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/14#discussion_r66688400
  
    --- Diff: query_execution/Foreman.cpp ---
    @@ -22,355 +22,189 @@
     #include <utility>
     #include <vector>
     
    -#include "catalog/CatalogDatabase.hpp"
    -#include "catalog/CatalogRelation.hpp"
    -#include "catalog/CatalogTypedefs.hpp"
    -#include "catalog/PartitionScheme.hpp"
    -#include "query_execution/QueryContext.hpp"
    -#include "query_execution/QueryExecutionMessages.pb.h"
    +#include "query_execution/AdmitRequestMessage.hpp"
     #include "query_execution/QueryExecutionTypedefs.hpp"
     #include "query_execution/QueryExecutionUtil.hpp"
     #include "query_execution/WorkerDirectory.hpp"
     #include "query_execution/WorkerMessage.hpp"
    -#include "relational_operators/RebuildWorkOrder.hpp"
    -#include "relational_operators/RelationalOperator.hpp"
    -#include "relational_operators/WorkOrder.hpp"
    -#include "storage/InsertDestination.hpp"
    -#include "storage/StorageBlock.hpp"
    -#include "storage/StorageBlockInfo.hpp"
     #include "threading/ThreadUtil.hpp"
    +#include "utility/EqualsAnyConstant.hpp"
     #include "utility/Macros.hpp"
     
    +#include "gflags/gflags.h"
     #include "glog/logging.h"
     
     #include "tmb/message_bus.h"
     #include "tmb/tagged_message.h"
     
     using std::move;
    -using std::pair;
     using std::size_t;
    +using std::unique_ptr;
     using std::vector;
     
     namespace quickstep {
     
    -void Foreman::initialize() {
    +DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the 
number "
    +              "of pending work orders for the worker. This information is 
used "
    +              "by the Foreman to assign work orders to worker threads");
    +
    +Foreman::Foreman(const tmb::client_id main_thread_client_id,
    +                 WorkerDirectory *worker_directory,
    +                 tmb::MessageBus *bus,
    +                 CatalogDatabaseLite *catalog_database,
    +                 StorageManager *storage_manager,
    +                 const int cpu_id,
    +                 const size_t num_numa_nodes)
    +    : ForemanLite(bus, cpu_id),
    +      main_thread_client_id_(main_thread_client_id),
    +      worker_directory_(DCHECK_NOTNULL(worker_directory)),
    +      catalog_database_(DCHECK_NOTNULL(catalog_database)),
    +      storage_manager_(DCHECK_NOTNULL(storage_manager)) {
    +  const std::vector<QueryExecutionMessageType> sender_message_types{
    +      kPoisonMessage,
    +      kRebuildWorkOrderMessage,
    +      kWorkOrderMessage,
    +      kWorkloadCompletionMessage};
    +
    +  for (const auto message_type : sender_message_types) {
    +    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
    +  }
    +
    +  const std::vector<QueryExecutionMessageType> receiver_message_types{
    +      kAdmitRequestMessage,
    +      kCatalogRelationNewBlockMessage,
    +      kDataPipelineMessage,
    +      kPoisonMessage,
    +      kRebuildWorkOrderCompleteMessage,
    +      kWorkOrderFeedbackMessage,
    +      kWorkOrdersAvailableMessage,
    +      kWorkOrderCompleteMessage};
    +
    +  for (const auto message_type : receiver_message_types) {
    +    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
    +  }
    +
    +  policy_enforcer_.reset(new PolicyEnforcer(
    +      foreman_client_id_,
    +      num_numa_nodes,
    +      catalog_database_,
    +      storage_manager_,
    +      bus_));
    +}
    +
    +void Foreman::run() {
       if (cpu_id_ >= 0) {
         // We can pin the foreman thread to a CPU if specified.
         ThreadUtil::BindToCPU(cpu_id_);
       }
    -  initializeState();
    -
    -  DEBUG_ASSERT(query_dag_ != nullptr);
    -  const dag_node_index dag_size = query_dag_->size();
    -
    -  // Collect all the workorders from all the relational operators in the 
DAG.
    -  for (dag_node_index index = 0; index < dag_size; ++index) {
    -    if (checkAllBlockingDependenciesMet(index)) {
    -      
query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
    -      processOperator(index, false);
    -    }
    -  }
    -
    -  // Dispatch the WorkOrders generated so far.
    -  dispatchWorkerMessages(0, 0);
    -}
    -
    -void Foreman::processWorkOrderCompleteMessage(const dag_node_index 
op_index,
    -                                              const size_t 
worker_thread_index) {
    -  query_exec_state_->decrementNumQueuedWorkOrders(op_index);
    -
    -  // As the given worker finished executing a WorkOrder, decrement its 
number
    -  // of queued WorkOrders.
    -  workers_->decrementNumQueuedWorkOrders(worker_thread_index);
    -
    -  // Check if new work orders are available and fetch them if so.
    -  fetchNormalWorkOrders(op_index);
    -
    -  if (checkRebuildRequired(op_index)) {
    -    if (checkNormalExecutionOver(op_index)) {
    -      if (!checkRebuildInitiated(op_index)) {
    -        if (initiateRebuild(op_index)) {
    -          // Rebuild initiated and completed right away.
    -          markOperatorFinished(op_index);
    -        } else {
    -          // Rebuild under progress.
    -        }
    -      } else if (checkRebuildOver(op_index)) {
    -        // Rebuild was under progress and now it is over.
    -        markOperatorFinished(op_index);
    -      }
    -    } else {
    -      // Normal execution under progress for this operator.
    -    }
    -  } else if (checkOperatorExecutionOver(op_index)) {
    -    // Rebuild not required for this operator and its normal execution is
    -    // complete.
    -    markOperatorFinished(op_index);
    -  }
    -
    -  for (const pair<dag_node_index, bool> &dependent_link :
    -       query_dag_->getDependents(op_index)) {
    -    const dag_node_index dependent_op_index = dependent_link.first;
    -    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
    -      // Process the dependent operator (of the operator whose WorkOrder
    -      // was just executed) for which all the dependencies have been met.
    -      processOperator(dependent_op_index, true);
    -    }
    -  }
    -
    -  // Dispatch the WorkerMessages to the workers. We prefer to start the 
search
    -  // for the schedulable WorkOrders beginning from 'op_index'. The first
    -  // candidate worker to receive the next WorkOrder is the one that sent 
the
    -  // response message to Foreman.
    -  dispatchWorkerMessages(worker_thread_index, op_index);
    -}
    -
    -void Foreman::processRebuildWorkOrderCompleteMessage(const dag_node_index 
op_index,
    -                                                     const size_t 
worker_thread_index) {
    -  query_exec_state_->decrementNumRebuildWorkOrders(op_index);
    -  workers_->decrementNumQueuedWorkOrders(worker_thread_index);
    -
    -  if (checkRebuildOver(op_index)) {
    -    markOperatorFinished(op_index);
    -
    -    for (const pair<dag_node_index, bool> &dependent_link :
    -         query_dag_->getDependents(op_index)) {
    -      const dag_node_index dependent_op_index = dependent_link.first;
    -      if (checkAllBlockingDependenciesMet(dependent_op_index)) {
    -        processOperator(dependent_op_index, true);
    -      }
    -    }
    -  }
    -
    -  // Dispatch the WorkerMessages to the workers. We prefer to start the 
search
    -  // for the schedulable WorkOrders beginning from 'op_index'. The first
    -  // candidate worker to receive the next WorkOrder is the one that sent 
the
    -  // response message to Foreman.
    -  dispatchWorkerMessages(worker_thread_index, op_index);
    -}
    -
    -void Foreman::processDataPipelineMessage(const dag_node_index op_index,
    -                                         const block_id block,
    -                                         const relation_id rel_id) {
    -  for (const dag_node_index consumer_index :
    -       output_consumers_[op_index]) {
    -    // Feed the streamed block to the consumer. Note that 
'output_consumers_'
    -    // only contain those dependents of operator with index = op_index 
which are
    -    // eligible to receive streamed input.
    -    
query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, 
rel_id);
    -    // Because of the streamed input just fed, check if there are any new
    -    // WorkOrders available and if so, fetch them.
    -    fetchNormalWorkOrders(consumer_index);
    -  }
    -
    -  // Dispatch the WorkerMessages to the workers. We prefer to start the 
search
    -  // for the schedulable WorkOrders beginning from 'op_index'. The first
    -  // candidate worker to receive the next WorkOrder is the one that sent 
the
    -  // response message to Foreman.
    -  // TODO(zuyu): Improve the data locality for the next WorkOrder.
    -  dispatchWorkerMessages(0, op_index);
    -}
    -
    -void Foreman::processFeedbackMessage(const WorkOrder::FeedbackMessage 
&msg) {
    -  RelationalOperator *op =
    -      query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
    -  op->receiveFeedbackMessage(msg);
    -}
    -
    -void Foreman::run() {
    -  // Initialize before for Foreman eventloop.
    -  initialize();
     
       // Event loop
    -  while (!query_exec_state_->hasQueryExecutionFinished()) {
    +  for (;;) {
         // Receive() causes this thread to sleep until next message is 
received.
    -    AnnotatedMessage annotated_msg = bus_->Receive(foreman_client_id_, 0, 
true);
    +    const AnnotatedMessage annotated_msg =
    +        bus_->Receive(foreman_client_id_, 0, true);
         const TaggedMessage &tagged_message = annotated_msg.tagged_message;
    -    switch (tagged_message.message_type()) {
    -      case kWorkOrderCompleteMessage: {
    -        serialization::WorkOrderCompletionMessage proto;
    -        CHECK(proto.ParseFromArray(tagged_message.message(), 
tagged_message.message_bytes()));
    -
    -        processWorkOrderCompleteMessage(proto.operator_index(), 
proto.worker_thread_index());
    -        break;
    -      }
    -      case kRebuildWorkOrderCompleteMessage: {
    -        serialization::WorkOrderCompletionMessage proto;
    -        CHECK(proto.ParseFromArray(tagged_message.message(), 
tagged_message.message_bytes()));
    -
    -        processRebuildWorkOrderCompleteMessage(proto.operator_index(), 
proto.worker_thread_index());
    +    const tmb::message_type_id message_type = 
tagged_message.message_type();
    +    switch (message_type) {
    +      case kCatalogRelationNewBlockMessage:  // Fall through
    +      case kDataPipelineMessage:
    +      case kRebuildWorkOrderCompleteMessage:
    +      case kWorkOrderCompleteMessage:
    +      case kWorkOrderFeedbackMessage:
    +      case kWorkOrdersAvailableMessage: {
    +        policy_enforcer_->processMessage(tagged_message);
             break;
           }
    -      case kCatalogRelationNewBlockMessage: {
    -        serialization::CatalogRelationNewBlockMessage proto;
    -        CHECK(proto.ParseFromArray(tagged_message.message(), 
tagged_message.message_bytes()));
    -
    -        const block_id block = proto.block_id();
    -
    -        CatalogRelation *relation =
    -            
static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
    -        relation->addBlock(block);
    -
    -        if (proto.has_partition_id()) {
    -          
relation->getPartitionSchemeMutable()->addBlockToPartition(proto.partition_id(),
 block);
    +      case kAdmitRequestMessage: {
    +        const AdmitRequestMessage *msg =
    +            static_cast<const AdmitRequestMessage 
*>(tagged_message.message());
    +        const vector<QueryHandle *> &query_handles = 
msg->getQueryHandles();
    +
    +        DCHECK(!query_handles.empty());
    +        bool all_queries_admitted = true;
    +        if (query_handles.size() == 1u) {
    +          all_queries_admitted =
    +              policy_enforcer_->admitQuery(query_handles.front());
    +        } else {
    +          all_queries_admitted = 
policy_enforcer_->admitQueries(query_handles);
    +        }
    +        if (!all_queries_admitted) {
    +          LOG(WARNING) << "The scheduler could not admit all the queries";
    +          // TODO(harshad) - Inform the main thread about the failure.
             }
             break;
           }
    -      case kDataPipelineMessage: {
    -        // Possible message senders include InsertDestinations and some
    -        // operators which modify existing blocks.
    -        serialization::DataPipelineMessage proto;
    -        CHECK(proto.ParseFromArray(tagged_message.message(), 
tagged_message.message_bytes()));
    -
    -        processDataPipelineMessage(proto.operator_index(), 
proto.block_id(), proto.relation_id());
    -        break;
    -      }
    -      case kWorkOrdersAvailableMessage: {
    -        serialization::WorkOrdersAvailableMessage proto;
    -        CHECK(proto.ParseFromArray(tagged_message.message(), 
tagged_message.message_bytes()));
    -
    -        const dag_node_index op_index = proto.operator_index();
    -
    -        // Check if new work orders are available.
    -        fetchNormalWorkOrders(op_index);
    -
    -        // Dispatch the WorkerMessages to the workers. We prefer to start 
the search
    -        // for the schedulable WorkOrders beginning from 'op_index'. The 
first
    -        // candidate worker to receive the next WorkOrder is the one that 
sent the
    -        // response message to Foreman.
    -        // TODO(zuyu): Improve the data locality for the next WorkOrder.
    -        dispatchWorkerMessages(0, op_index);
    -        break;
    -      }
    -      case kWorkOrderFeedbackMessage: {
    -        WorkOrder::FeedbackMessage msg(const_cast<void 
*>(tagged_message.message()),
    -                                       tagged_message.message_bytes());
    -        processFeedbackMessage(msg);
    -        break;
    +      case kPoisonMessage: {
    +        if (policy_enforcer_->hasQueries()) {
    +          LOG(WARNING) << "Foreman thread exiting while some queries are "
    +                          "under execution or waiting to be admitted";
    +        }
    +        return;
           }
           default:
             LOG(FATAL) << "Unknown message type to Foreman";
         }
    -  }
    -
    -  // Clean up before exiting.
    -  cleanUp();
    -}
     
    -void Foreman::dispatchWorkerMessages(
    -    const size_t start_worker_index,
    -    const dag_node_index start_operator_index) {
    -  // Loop over all workers. Stopping criteria:
    -  // 1. Every worker has been assigned exactly max_msgs_per_worker_ 
workorders.
    -  // OR 2. No schedulable workorders at this time.
    -  size_t done_workers_count = 0;
    -  for (size_t curr_worker = start_worker_index;
    -       done_workers_count < workers_->getNumWorkers();
    -       curr_worker = (curr_worker + 1) % workers_->getNumWorkers()) {
    -    if (workers_->getNumQueuedWorkOrders(curr_worker) < 
max_msgs_per_worker_) {
    -      std::unique_ptr<WorkerMessage> msg;
    -      msg.reset(getNextWorkerMessage(
    -          start_operator_index, workers_->getNUMANode(curr_worker)));
    -      if (msg.get() != nullptr) {
    -        sendWorkerMessage(curr_worker, *msg);
    -        workers_->incrementNumQueuedWorkOrders(curr_worker);
    -      } else {
    -        // No schedulable workorder at this point.
    -        ++done_workers_count;
    -      }
    -    } else {
    -      // curr_worker already has been assigned max_msgs_per_worker 
workorders.
    -      ++done_workers_count;
    +    if (canCollectNewMessages(message_type)) {
    +      vector<unique_ptr<WorkerMessage>> new_messages;
    +      policy_enforcer_->getWorkerMessages(&new_messages);
    +      dispatchWorkerMessages(new_messages);
    +    }
    +
    +    // We check again, as some queries may produce zero work orders and 
finish
    +    // their execution.
    +    if (!policy_enforcer_->hasQueries()) {
    +      // Signal the main thread that there are no queries to be executed.
    +      // Currently the message doesn't have any real content.
    +      const int dummy_payload = 0;
    +      TaggedMessage completion_tagged_message(
    +          &dummy_payload, sizeof(dummy_payload), 
kWorkloadCompletionMessage);
    +      const tmb::MessageBus::SendStatus send_status =
    +          QueryExecutionUtil::SendTMBMessage(
    +              bus_,
    +              foreman_client_id_,
    +              main_thread_client_id_,
    +              move(completion_tagged_message));
    +      CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
    +          << "Message could not be sent from Foreman with TMB client ID "
    +          << foreman_client_id_ << " to main thread with TMB client ID"
    +          << main_thread_client_id_;
         }
       }
     }
     
    -void Foreman::initializeState() {
    -  const dag_node_index dag_size = query_dag_->size();
    -
    -  output_consumers_.resize(dag_size);
    -  blocking_dependencies_.resize(dag_size);
    -
    -  query_exec_state_.reset(new QueryExecutionState(dag_size));
    -  workorders_container_.reset(new WorkOrdersContainer(dag_size, 
num_numa_nodes_));
    -
    -  for (dag_node_index node_index = 0; node_index < dag_size; ++node_index) 
{
    -    const QueryContext::insert_destination_id insert_destination_index =
    -        query_dag_->getNodePayload(node_index).getInsertDestinationID();
    -    if (insert_destination_index != 
QueryContext::kInvalidInsertDestinationId) {
    -      // Rebuild is necessary whenever InsertDestination is present.
    -      query_exec_state_->setRebuildRequired(node_index);
    -      query_exec_state_->setRebuildStatus(node_index, 0, false);
    -    }
    -
    -    for (const pair<dag_node_index, bool> &dependent_link :
    -         query_dag_->getDependents(node_index)) {
    -      const dag_node_index dependent_op_index = dependent_link.first;
    -      if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
    -        // The link is not a pipeline-breaker. Streaming of blocks is 
possible
    -        // between these two operators.
    -        output_consumers_[node_index].push_back(dependent_op_index);
    -      } else {
    -        // The link is a pipeline-breaker. Streaming of blocks is not 
possible
    -        // between these two operators.
    -        blocking_dependencies_[dependent_op_index].push_back(node_index);
    -      }
    -    }
    +bool Foreman::canCollectNewMessages(const tmb::message_type_id 
message_type) {
    +  if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
    +                                    kCatalogRelationNewBlockMessage,
    +                                    kWorkOrderFeedbackMessage)) {
    +    return false;
    +  } else if (worker_directory_->getLeastLoadedWorker().second <=
    +             FLAGS_min_load_per_worker) {
    +    // If the least loaded worker has only one pending work order, we 
should
    +    // collect new messages and dispatch them.
    +    return true;
    +  } else {
    +    return false;
       }
     }
     
    -// TODO(harshad) : The default policy may execute remote WorkOrders for an
    -// operator with a lower index even when there are local WorkOrders 
available for
    -// an operator with higher index. We should examine if avoiding this 
behavior
    -// has any benefits with respect to execution time and/or memory pressure.
    -WorkerMessage* Foreman::getNextWorkerMessage(
    -    const dag_node_index start_operator_index, const int numa_node) {
    -  // Default policy: Operator with lowest index first.
    -  WorkOrder *work_order = nullptr;
    -  size_t num_operators_checked = 0;
    -  for (dag_node_index index = start_operator_index;
    -       num_operators_checked < query_dag_->size();
    -       index = (index + 1) % query_dag_->size(), ++num_operators_checked) {
    -    if (query_exec_state_->hasExecutionFinished(index)) {
    -      continue;
    -    }
    -    if (numa_node != -1) {
    -      // First try to get a normal WorkOrder from the specified NUMA node.
    -      work_order = 
workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
    -      if (work_order != nullptr) {
    -        // A WorkOrder found on the given NUMA node.
    -        query_exec_state_->incrementNumQueuedWorkOrders(index);
    -        return WorkerMessage::WorkOrderMessage(work_order, index);
    -      } else {
    -        // Normal workorder not found on this node. Look for a rebuild 
workorder
    -        // on this NUMA node.
    -        work_order = 
workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
    -        if (work_order != nullptr) {
    -          return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
    -        }
    -      }
    -    }
    -    // Either no workorder found on the given NUMA node, or numa_node is 
-1.
    -    // Try to get a normal WorkOrder from other NUMA nodes.
    -    work_order = workorders_container_->getNormalWorkOrder(index);
    -    if (work_order != nullptr) {
    -      query_exec_state_->incrementNumQueuedWorkOrders(index);
    -      return WorkerMessage::WorkOrderMessage(work_order, index);
    +void Foreman::dispatchWorkerMessages(const 
vector<unique_ptr<WorkerMessage>> &messages) {
    +  for (auto const &message : messages) {
    +    DCHECK(message != nullptr);
    +    int recipient_worker_thread_index = message->getRecipientHint();
    --- End diff --
    
    We could mark `const`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to