sohami commented on a change in pull request #1677: DRILL-7068: Support memory 
adjustment framework for resource manageme…
URL: https://github.com/apache/drill/pull/1677#discussion_r263258605
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
 ##########
 @@ -108,27 +111,106 @@ public double getAffinityFactor() {
     return affinityFactor;
   }
 
+  public Set<Wrapper> getRootFragments(PlanningSet planningSet) {
+    //The following code gets the root fragment by removing all the dependent 
fragments on which root fragments depend upon.
+    //This is fine because the later parallelizer code traverses from these 
root fragments to their respective dependent
+    //fragments.
+    final Set<Wrapper> roots = Sets.newHashSet();
+    for(Wrapper w : planningSet) {
+      roots.add(w);
+    }
+
+    //roots will be left over with the fragments which are not depended upon 
by any other fragments.
+    for(Wrapper wrapper : planningSet) {
+      final List<Wrapper> fragmentDependencies = 
wrapper.getFragmentDependencies();
+      if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
+        for(Wrapper dependency : fragmentDependencies) {
+          if (roots.contains(dependency)) {
+            roots.remove(dependency);
+          }
+        }
+      }
+    }
+
+    return roots;
+  }
+
+  public PlanningSet prepareFragmentTree(Fragment rootFragment) {
+    PlanningSet planningSet = new PlanningSet();
+
+    initFragmentWrappers(rootFragment, planningSet);
+
+    constructFragmentDependencyGraph(rootFragment, planningSet);
+
+    return planningSet;
+  }
+
+  /**
+   * Traverse all the major fragments and parallelize each major fragment 
based on
+   * collected stats. The children fragments are parallelized before a parent
+   * fragment.
+   * @param planningSet Set of all major fragments and their context.
+   * @param roots Root nodes of the plan.
+   * @param activeEndpoints currently active drillbit endpoints.
+   * @throws PhysicalOperatorSetupException
+   */
+  public void collectStatsAndParallelizeFragments(PlanningSet planningSet, 
Set<Wrapper> roots,
+                                                  Collection<DrillbitEndpoint> 
activeEndpoints) throws PhysicalOperatorSetupException {
+    for (Wrapper wrapper : roots) {
+      traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper 
fragmentWrapper) -> {
+        // If this fragment is already parallelized then no need do it again.
+        // This happens in the case of fragments which have MUX operators.
+        if (fragmentWrapper.isEndpointsAssignmentDone()) {
+          return;
+        }
+        fragmentWrapper.getNode().getRoot().accept(new 
StatsCollector(planningSet), fragmentWrapper);
+        fragmentWrapper.getStats()
+                       .getDistributionAffinity()
+                       .getFragmentParallelizer()
+                       .parallelizeFragment(fragmentWrapper, this, 
activeEndpoints);
+        //consolidate the cpu resources required by this major fragment per 
drillbit.
+        fragmentWrapper.computeCpuResources();
+      }));
+    }
+  }
+
+  public abstract void adjustMemory(PlanningSet planningSet, Set<Wrapper> 
roots,
+                                    Collection<DrillbitEndpoint> 
activeEndpoints) throws PhysicalOperatorSetupException;
+
   /**
-   * Generate a set of assigned fragments based on the provided fragment tree. 
Do not allow parallelization stages
-   * to go beyond the global max width.
+   * The starting function for the whole parallelization and memory 
computation logic.
+   * 1) Initially a fragment tree is prepared which contains a wrapper for 
each fragment.
+   *    The topology of this tree is same as that of the major fragment tree.
+   * 2) Traverse this fragment tree to collect stats for each major fragment 
and then
+   *    parallelize each fragment. At this stage minor fragments are not 
created but all
+   *    the required information to create minor fragment are computed.
+   * 3) Memory is computed for each operator and for the minor fragment.
+   * 4) Lastly all the above computed information is used to create the minor 
fragments
+   *    for each major fragment.
    *
-   * @param options         Option list
-   * @param foremanNode     The driving/foreman node for this query.  (this 
node)
-   * @param queryId         The queryId for this query.
-   * @param activeEndpoints The list of endpoints to consider for inclusion in 
planning this query.
-   * @param rootFragment    The root node of the PhysicalPlan that we will be 
parallelizing.
-   * @param session         UserSession of user who launched this query.
-   * @param queryContextInfo Info related to the context when query has 
started.
-   * @return The list of generated PlanFragment protobuf objects to be 
assigned out to the individual nodes.
+   * @param options List of options set by the user.
+   * @param foremanNode foreman node for this query plan.
+   * @param queryId  Query ID.
+   * @param activeEndpoints currently active endpoins on which this plan will 
run.
+   * @param rootFragment Root major fragment.
+   * @param session session context.
+   * @param queryContextInfo query context.
+   * @return
    * @throws ExecutionSetupException
    */
-  public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint 
foremanNode, QueryId queryId,
-      Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
-      UserSession session, QueryContextInformation queryContextInfo) throws 
ExecutionSetupException {
+  @Override
+  public final QueryWorkUnit generateWorkUnits(OptionList options, 
DrillbitEndpoint foremanNode, QueryId queryId,
+                                               Collection<DrillbitEndpoint> 
activeEndpoints, Fragment rootFragment,
+                                               UserSession session, 
QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+    PlanningSet planningSet = prepareFragmentTree(rootFragment);
+
+    Set<Wrapper> rootFragments = getRootFragments(planningSet);
+
+    collectStatsAndParallelizeFragments(planningSet, rootFragments, 
activeEndpoints);
 
-    final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, 
rootFragment);
-    return generateWorkUnit(
-        options, foremanNode, queryId, rootFragment, planningSet, session, 
queryContextInfo);
+    adjustMemory(planningSet, rootFragments, activeEndpoints);
+
+    return generateWorkUnit(options, foremanNode, queryId, rootFragment, 
planningSet, session, queryContextInfo);
 
 Review comment:
   I am guessing that the code to actually set memory limit on each physical 
operator instance on each of the minor fragment will be done in subsequent PR's 
? Right now it's being set using the old logic in `visitPhysicalPlan`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to