sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214889652
########## File path: compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java ########## @@ -49,33 +56,40 @@ public SkewReshapingPass() { @Override public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) { final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(); - final List<MetricCollectionBarrierVertex> metricCollectionVertices = new ArrayList<>(); + final List<MetricCollectionVertex> metricCollectionVertices = new ArrayList<>(); dag.topologicalDo(v -> { // We care about OperatorVertices that have any incoming edges that are of type Shuffle. if (v instanceof OperatorVertex && dag.getIncomingEdgesOf(v).stream().anyMatch(irEdge -> CommunicationPatternProperty.Value.Shuffle - .equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get()))) { - final MetricCollectionBarrierVertex<Integer, Long> metricCollectionBarrierVertex - = new MetricCollectionBarrierVertex<>(); - metricCollectionVertices.add(metricCollectionBarrierVertex); - builder.addVertex(v); - builder.addVertex(metricCollectionBarrierVertex); + .equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get())) + && dag.getIncomingEdgesOf(v).stream().noneMatch(irEdge -> + irEdge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())) { Review comment: Please answer to this question. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services