Github user ppadma commented on a diff in the pull request:
https://github.com/apache/drill/pull/1051#discussion_r153290872
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---
@@ -426,48 +413,25 @@ private void runPhysicalPlan(final PhysicalPlan plan)
throws ExecutionSetupExcep
queryManager.setTotalCost(plan.totalCost());
work.applyPlan(drillbitContext.getPlanReader());
logWorkUnit(work);
- admit(work);
- queryManager.setQueueName(queryRM.queueName());
-
- final List<PlanFragment> planFragments = work.getFragments();
- final PlanFragment rootPlanFragment = work.getRootFragment();
- assert queryId == rootPlanFragment.getHandle().getQueryId();
- drillbitContext.getWorkBus().addFragmentStatusListener(queryId,
queryManager.getFragmentStatusListener());
-
drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
+ fragmentsRunner.setPlanFragments(work.getFragments());
+ fragmentsRunner.setRootPlanFragment(work.getRootFragment());
+ fragmentsRunner.setRootOperator(work.getRootOperator());
- logger.debug("Submitting fragments to run.");
-
- // set up the root fragment first so we'll have incoming buffers
available.
- setupRootFragment(rootPlanFragment, work.getRootOperator());
-
- setupNonRootFragments(planFragments);
-
- moveToState(QueryState.RUNNING, null);
- logger.debug("Fragments running.");
+ admit();
}
- private void admit(QueryWorkUnit work) throws ForemanSetupException {
+ private void admit() throws ForemanSetupException {
queryManager.markPlanningEndTime();
- try {
- queryRM.admit();
- } catch (QueueTimeoutException e) {
- throw UserException
- .resourceError()
- .message(e.getMessage())
- .build(logger);
- } catch (QueryQueueException e) {
- throw new ForemanSetupException(e.getMessage(), e);
- } finally {
- queryManager.markQueueWaitEndTime();
- }
- moveToState(QueryState.STARTING, null);
+ planningQueries.dec();
--- End diff --
Better to handle increment and decrement of counters in moveToState.
---