sohami commented on a change in pull request #1677: DRILL-7068: Support memory 
adjustment framework for resource manageme…
URL: https://github.com/apache/drill/pull/1677#discussion_r263055312
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
 ##########
 @@ -108,27 +111,106 @@ public double getAffinityFactor() {
     return affinityFactor;
   }
 
+  public Set<Wrapper> getRootFragments(PlanningSet planningSet) {
+    //The following code gets the root fragment by removing all the dependent 
fragments on which root fragments depend upon.
+    //This is fine because the later parallelizer code traverses from these 
root fragments to their respective dependent
+    //fragments.
+    final Set<Wrapper> roots = Sets.newHashSet();
+    for(Wrapper w : planningSet) {
+      roots.add(w);
+    }
+
+    //roots will be left over with the fragments which are not depended upon 
by any other fragments.
+    for(Wrapper wrapper : planningSet) {
+      final List<Wrapper> fragmentDependencies = 
wrapper.getFragmentDependencies();
+      if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
+        for(Wrapper dependency : fragmentDependencies) {
+          if (roots.contains(dependency)) {
+            roots.remove(dependency);
+          }
+        }
+      }
+    }
+
+    return roots;
+  }
+
+  public PlanningSet prepareFragmentTree(Fragment rootFragment) {
+    PlanningSet planningSet = new PlanningSet();
+
+    initFragmentWrappers(rootFragment, planningSet);
+
+    constructFragmentDependencyGraph(rootFragment, planningSet);
+
+    return planningSet;
+  }
+
+  /**
+   * Traverse all the major fragments and parallelize each major fragment 
based on
+   * collected stats. The children fragments are parallelized before a parent
+   * fragment.
+   * @param planningSet Set of all major fragments and their context.
+   * @param roots Root nodes of the plan.
+   * @param activeEndpoints currently active drillbit endpoints.
+   * @throws PhysicalOperatorSetupException
+   */
+  public void collectStatsAndParallelizeFragments(PlanningSet planningSet, 
Set<Wrapper> roots,
+                                                  Collection<DrillbitEndpoint> 
activeEndpoints) throws PhysicalOperatorSetupException {
+    for (Wrapper wrapper : roots) {
+      traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper 
fragmentWrapper) -> {
+        // If this fragment is already parallelized then no need do it again.
+        // This happens in the case of fragments which have MUX operators.
+        if (fragmentWrapper.isEndpointsAssignmentDone()) {
+          return;
+        }
+        fragmentWrapper.getNode().getRoot().accept(new 
StatsCollector(planningSet), fragmentWrapper);
+        fragmentWrapper.getStats()
+                       .getDistributionAffinity()
+                       .getFragmentParallelizer()
+                       .parallelizeFragment(fragmentWrapper, this, 
activeEndpoints);
+        //consolidate the cpu resources required by this major fragment per 
drillbit.
+        fragmentWrapper.computeCpuResources();
+      }));
+    }
+  }
+
+  public abstract void adjustMemory(PlanningSet planningSet, Set<Wrapper> 
roots,
+                                    Collection<DrillbitEndpoint> 
activeEndpoints) throws PhysicalOperatorSetupException;
+
   /**
-   * Generate a set of assigned fragments based on the provided fragment tree. 
Do not allow parallelization stages
-   * to go beyond the global max width.
+   * The starting function for the whole parallelization and memory 
computation logic.
+   * 1) Initially a fragment tree is prepared which contains a wrapper for 
each fragment.
+   *    The topology of this tree is same as that of the major fragment tree.
+   * 2) Traverse this fragment tree to collect stats for each major fragment 
and then
+   *    parallelize each fragment. At this stage minor fragments are not 
created but all
+   *    the required information to create minor fragment are computed.
+   * 3) Memory is computed for each operator and for the minor fragment.
 
 Review comment:
   How memory is calculated for the minor fragments when minor fragments are 
generated in step 4 ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to