sohami commented on a change in pull request #1677: DRILL-7068: Support memory adjustment framework for resource manageme… URL: https://github.com/apache/drill/pull/1677#discussion_r263258605
########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java ########## @@ -108,27 +111,106 @@ public double getAffinityFactor() { return affinityFactor; } + public Set<Wrapper> getRootFragments(PlanningSet planningSet) { + //The following code gets the root fragment by removing all the dependent fragments on which root fragments depend upon. + //This is fine because the later parallelizer code traverses from these root fragments to their respective dependent + //fragments. + final Set<Wrapper> roots = Sets.newHashSet(); + for(Wrapper w : planningSet) { + roots.add(w); + } + + //roots will be left over with the fragments which are not depended upon by any other fragments. + for(Wrapper wrapper : planningSet) { + final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies(); + if (fragmentDependencies != null && fragmentDependencies.size() > 0) { + for(Wrapper dependency : fragmentDependencies) { + if (roots.contains(dependency)) { + roots.remove(dependency); + } + } + } + } + + return roots; + } + + public PlanningSet prepareFragmentTree(Fragment rootFragment) { + PlanningSet planningSet = new PlanningSet(); + + initFragmentWrappers(rootFragment, planningSet); + + constructFragmentDependencyGraph(rootFragment, planningSet); + + return planningSet; + } + + /** + * Traverse all the major fragments and parallelize each major fragment based on + * collected stats. The children fragments are parallelized before a parent + * fragment. + * @param planningSet Set of all major fragments and their context. + * @param roots Root nodes of the plan. + * @param activeEndpoints currently active drillbit endpoints. + * @throws PhysicalOperatorSetupException + */ + public void collectStatsAndParallelizeFragments(PlanningSet planningSet, Set<Wrapper> roots, + Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException { + for (Wrapper wrapper : roots) { + traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragmentWrapper) -> { + // If this fragment is already parallelized then no need do it again. + // This happens in the case of fragments which have MUX operators. + if (fragmentWrapper.isEndpointsAssignmentDone()) { + return; + } + fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper); + fragmentWrapper.getStats() + .getDistributionAffinity() + .getFragmentParallelizer() + .parallelizeFragment(fragmentWrapper, this, activeEndpoints); + //consolidate the cpu resources required by this major fragment per drillbit. + fragmentWrapper.computeCpuResources(); + })); + } + } + + public abstract void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots, + Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException; + /** - * Generate a set of assigned fragments based on the provided fragment tree. Do not allow parallelization stages - * to go beyond the global max width. + * The starting function for the whole parallelization and memory computation logic. + * 1) Initially a fragment tree is prepared which contains a wrapper for each fragment. + * The topology of this tree is same as that of the major fragment tree. + * 2) Traverse this fragment tree to collect stats for each major fragment and then + * parallelize each fragment. At this stage minor fragments are not created but all + * the required information to create minor fragment are computed. + * 3) Memory is computed for each operator and for the minor fragment. + * 4) Lastly all the above computed information is used to create the minor fragments + * for each major fragment. * - * @param options Option list - * @param foremanNode The driving/foreman node for this query. (this node) - * @param queryId The queryId for this query. - * @param activeEndpoints The list of endpoints to consider for inclusion in planning this query. - * @param rootFragment The root node of the PhysicalPlan that we will be parallelizing. - * @param session UserSession of user who launched this query. - * @param queryContextInfo Info related to the context when query has started. - * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes. + * @param options List of options set by the user. + * @param foremanNode foreman node for this query plan. + * @param queryId Query ID. + * @param activeEndpoints currently active endpoins on which this plan will run. + * @param rootFragment Root major fragment. + * @param session session context. + * @param queryContextInfo query context. + * @return * @throws ExecutionSetupException */ - public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, - Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment, - UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { + @Override + public final QueryWorkUnit generateWorkUnits(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, + Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment, + UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { + PlanningSet planningSet = prepareFragmentTree(rootFragment); + + Set<Wrapper> rootFragments = getRootFragments(planningSet); + + collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints); - final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment); - return generateWorkUnit( - options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo); + adjustMemory(planningSet, rootFragments, activeEndpoints); + + return generateWorkUnit(options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo); Review comment: I am guessing that the code to actually set memory limit on each physical operator instance on each of the minor fragment will be done in subsequent PR's ? Right now it's being set using the old logic in `visitPhysicalPlan` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services