johnyangk commented on a change in pull request #62: [NEMO-71] Add 
LocationShareAssignmentPass and Example Application
URL: https://github.com/apache/incubator-nemo/pull/62#discussion_r199796747
 
 

 ##########
 File path: 
compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LocationShareAssignmentPass.java
 ##########
 @@ -0,0 +1,237 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import 
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.LocationSharesProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.commons.math3.optim.BaseOptimizer;
+import org.apache.commons.math3.optim.PointValuePair;
+import org.apache.commons.math3.optim.linear.*;
+import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
+import org.apache.commons.math3.util.Incrementor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.*;
+
+/**
+ * Computes and assigns appropriate share of locations to each stage,
+ * with respect to bandwidth restrictions of locations. If bandwidth 
information is not given, this pass does nothing.
+ *
+ * <h3>Assumptions</h3>
+ * This pass assumes no skew in input or intermediate data, so that the number 
of TaskGroups assigned to a location
+ * is proportional to the data size handled by the location.
+ * Also, this pass assumes stages with empty map as {@link 
LocationSharesProperty} are assigned to locations evenly.
+ * For example, if source splits are not distributed evenly, any source 
location-aware scheduling policy will
+ * assign TaskGroups unevenly.
+ * Also, this pass assumes network bandwidth to be the bottleneck. Each 
location should have enough capacity to run
+ * TaskGroups immediately as scheduler attempts to schedule a TaskGroup.
+ */
+public final class LocationShareAssignmentPass extends AnnotatingPass {
+
+  private static final int OBJECTIVE_COEFFICIENT_INDEX = 0;
+  private static final Logger LOG = 
LoggerFactory.getLogger(LocationShareAssignmentPass.class);
+  private static final HashMap<String, Integer> EMPTY_MAP = new HashMap<>();
+
+  private static String bandwidthSpecificationString = "";
+
+
+  /**
+   * Default constructor.
+   */
+  public LocationShareAssignmentPass() {
+    super(LocationSharesProperty.class, 
Collections.singleton(ParallelismProperty.class));
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    if (bandwidthSpecificationString.isEmpty()) {
+      dag.topologicalDo(irVertex -> 
irVertex.setProperty(LocationSharesProperty.of(EMPTY_MAP)));
+    } else {
+      assignLocationShares(dag, 
BandwidthSpecification.fromJsonString(bandwidthSpecificationString));
+    }
+    return dag;
+  }
+
+  public static void setBandwidthSpecificationString(final String value) {
+    bandwidthSpecificationString = value;
+  }
+
+  private static void assignLocationShares(
+      final DAG<IRVertex, IREdge> dag,
+      final BandwidthSpecification bandwidthSpecification) {
+    dag.topologicalDo(irVertex -> {
+      final Collection<IREdge> inEdges = dag.getIncomingEdgesOf(irVertex);
+      final int parallelism = 
irVertex.getPropertyValue(ParallelismProperty.class)
+          .orElseThrow(() -> new RuntimeException("Parallelism property 
required"));
+      if (inEdges.size() == 0) {
+        // The stage is root stage.
+        // Fall back to setting even distribution
+        final HashMap<String, Integer> shares = new HashMap<>();
+        final List<String> locations = bandwidthSpecification.getLocations();
+        final int defaultShare = parallelism / locations.size();
+        final int remainder = parallelism % locations.size();
+        for (int i = 0; i < locations.size(); i++) {
+          shares.put(locations.get(i), defaultShare + (i < remainder ? 1 : 0));
+        }
+        
irVertex.getExecutionProperties().put(LocationSharesProperty.of(shares));
+      } else if (inEdges.size() == 1 && inEdges.iterator().next()
+          .getPropertyValue(DataCommunicationPatternProperty.class).get()
+          .equals(DataCommunicationPatternProperty.Value.OneToOne)) {
+        final Optional<LocationSharesProperty> property = 
dag.getIncomingEdgesOf(irVertex).iterator().next()
+            .getExecutionProperties().get(LocationSharesProperty.class);
+        irVertex.getExecutionProperties().put(property.get());
+      } else {
+        final Map<String, Integer> parentLocationShares = new HashMap<>();
 
 Review comment:
   Add a comment here what type of IRVertex this is?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to