Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1051#discussion_r156358222
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---
    @@ -502,33 +438,82 @@ private void runFragment(List<PlanFragment> 
fragmentsList) throws ExecutionSetup
           }
         }
     
    +    assert rootFragment != null;
    +
         final FragmentRoot rootOperator;
         try {
           rootOperator = 
drillbitContext.getPlanReader().readFragmentRoot(rootFragment.getFragmentJson());
         } catch (IOException e) {
           throw new ExecutionSetupException(String.format("Unable to parse 
FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
         }
         queryRM.setCost(rootOperator.getCost());
    -    admit(null);
    -    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, 
queryManager.getFragmentStatusListener());
    -    
drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
     
    -    logger.debug("Submitting fragments to run.");
    +    fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, 
rootOperator);
     
    -    // set up the root fragment first so we'll have incoming buffers 
available.
    -    setupRootFragment(rootFragment, rootOperator);
    +    startQueryProcessing();
    +  }
     
    -    setupNonRootFragments(planFragments);
    +  /**
    +   * Enqueues the query and once enqueued, starts sending out query 
fragments for further execution.
    +   * Moves query to RUNNING state.
    +   */
    +  private void startQueryProcessing() {
    +    enqueue();
    +    runFragments();
    +    queryStateProcessor.moveToState(QueryState.RUNNING, null);
    +  }
     
    -    moveToState(QueryState.RUNNING, null);
    -    logger.debug("Fragments running.");
    +  /**
    +   * Move query to ENQUEUED state. Enqueues query if queueing is enabled.
    +   * Foreman run will be blocked until query is enqueued.
    +   * In case of failures (ex: queue timeout exception) will move query to 
FAILED state.
    +   */
    +  private void enqueue() {
    +    queryStateProcessor.moveToState(QueryState.ENQUEUED, null);
    +
    +    try {
    +      queryRM.admit();
    +      queryStateProcessor.moveToState(QueryState.STARTING, null);
    +    } catch (QueueTimeoutException | QueryQueueException e) {
    +      queryStateProcessor.moveToState(QueryState.FAILED, e);
    +    } finally {
    +      String queueName = queryRM.queueName();
    +      queryManager.setQueueName(queueName == null ? "Unknown" : queueName);
    --- End diff --
    
    @paul-rogers `QueryStateProcessor` is responsible to enforcing valid state 
transitions (directly if fragments were not sent out yet) or though queuing (if 
fragments were sent out), proper cancelling and failing, Also it is responsible 
for query counters. `Foreman` is responsible for execution part and does all 
the work. If we move remaining logic from `Foreman` into  
`QueryStateProcessor`, we'll end up with the same huge unreadable class as 
`Foreman` was before. At this point of refactoring, I suggest we stop and if 
further refactoring is needed, create new Jira.


---

Reply via email to