johnyangk commented on a change in pull request #62: [NEMO-71] Add LocationShareAssignmentPass and Example Application URL: https://github.com/apache/incubator-nemo/pull/62#discussion_r199796665
########## File path: compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LocationShareAssignmentPass.java ########## @@ -0,0 +1,237 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating; + +import com.fasterxml.jackson.core.TreeNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import edu.snu.nemo.common.dag.DAG; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; +import edu.snu.nemo.common.ir.vertex.IRVertex; +import edu.snu.nemo.common.ir.vertex.executionproperty.LocationSharesProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; +import org.apache.commons.math3.optim.BaseOptimizer; +import org.apache.commons.math3.optim.PointValuePair; +import org.apache.commons.math3.optim.linear.*; +import org.apache.commons.math3.optim.nonlinear.scalar.GoalType; +import org.apache.commons.math3.util.Incrementor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.*; + +/** + * Computes and assigns appropriate share of locations to each stage, + * with respect to bandwidth restrictions of locations. If bandwidth information is not given, this pass does nothing. + * + * <h3>Assumptions</h3> + * This pass assumes no skew in input or intermediate data, so that the number of TaskGroups assigned to a location + * is proportional to the data size handled by the location. + * Also, this pass assumes stages with empty map as {@link LocationSharesProperty} are assigned to locations evenly. + * For example, if source splits are not distributed evenly, any source location-aware scheduling policy will + * assign TaskGroups unevenly. + * Also, this pass assumes network bandwidth to be the bottleneck. Each location should have enough capacity to run + * TaskGroups immediately as scheduler attempts to schedule a TaskGroup. + */ +public final class LocationShareAssignmentPass extends AnnotatingPass { + + private static final int OBJECTIVE_COEFFICIENT_INDEX = 0; + private static final Logger LOG = LoggerFactory.getLogger(LocationShareAssignmentPass.class); + private static final HashMap<String, Integer> EMPTY_MAP = new HashMap<>(); + + private static String bandwidthSpecificationString = ""; + + + /** + * Default constructor. + */ + public LocationShareAssignmentPass() { + super(LocationSharesProperty.class, Collections.singleton(ParallelismProperty.class)); + } + + @Override + public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) { + if (bandwidthSpecificationString.isEmpty()) { + dag.topologicalDo(irVertex -> irVertex.setProperty(LocationSharesProperty.of(EMPTY_MAP))); + } else { + assignLocationShares(dag, BandwidthSpecification.fromJsonString(bandwidthSpecificationString)); + } + return dag; + } + + public static void setBandwidthSpecificationString(final String value) { + bandwidthSpecificationString = value; + } + + private static void assignLocationShares( + final DAG<IRVertex, IREdge> dag, + final BandwidthSpecification bandwidthSpecification) { + dag.topologicalDo(irVertex -> { + final Collection<IREdge> inEdges = dag.getIncomingEdgesOf(irVertex); + final int parallelism = irVertex.getPropertyValue(ParallelismProperty.class) + .orElseThrow(() -> new RuntimeException("Parallelism property required")); + if (inEdges.size() == 0) { + // The stage is root stage. + // Fall back to setting even distribution + final HashMap<String, Integer> shares = new HashMap<>(); + final List<String> locations = bandwidthSpecification.getLocations(); + final int defaultShare = parallelism / locations.size(); + final int remainder = parallelism % locations.size(); + for (int i = 0; i < locations.size(); i++) { + shares.put(locations.get(i), defaultShare + (i < remainder ? 1 : 0)); + } + irVertex.getExecutionProperties().put(LocationSharesProperty.of(shares)); + } else if (inEdges.size() == 1 && inEdges.iterator().next() Review comment: Can you refactor the code inside "else if (code)" into a separate private method? boolean checkXYZ() { code } That'd make it clearer what type of irvertex this is. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services