jeongyooneo closed pull request #38: [NEMO-48] Do not merge broadcasted data into a single partition URL: https://github.com/apache/incubator-nemo/pull/38
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java index 3b586374..2d90388d 100644 --- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java +++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java @@ -16,8 +16,6 @@ package edu.snu.nemo.common.dag; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; -//import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty; import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.OperatorVertex; @@ -243,15 +241,6 @@ private void sinkCheck() { * Helper method to check that all execution properties are correct and makes sense. */ private void executionPropertyCheck() { - // SideInput edge must be broadcast - vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e) - .filter(e -> Boolean.TRUE.equals(e.isSideInput())) - .filter(e -> !(e.getProperty(ExecutionProperty.Key.DataCommunicationPattern)) - .equals(DataCommunicationPatternProperty.Value.BroadCast)) - .forEach(e -> { - throw new RuntimeException("DAG execution property check: " - + "SideInput edge must be broadcast: " + e.getId()); - })); // SideInput is not compatible with Push vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e) .filter(e -> Boolean.TRUE.equals(e.isSideInput())) diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java index e3072855..bf3253be 100644 --- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java +++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java @@ -263,7 +263,7 @@ private static BeamCoder getCoderForView(final ViewFn viewFn, final Coder beamIn if (dstTransform instanceof GroupByKeyTransform) { return DataCommunicationPatternProperty.Value.Shuffle; } - if (srcTransform instanceof CreateViewTransform || dstTransform instanceof CreateViewTransform) { + if (dstTransform instanceof CreateViewTransform) { return DataCommunicationPatternProperty.Value.BroadCast; } return DataCommunicationPatternProperty.Value.OneToOne; diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java index adf5218b..6b90111c 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java @@ -78,7 +78,7 @@ public DefaultParallelismPass(final int desiredSourceParallelism, // No reason to propagate via Broadcast edges, as the data streams that will use the broadcasted data // as a sideInput will have their own number of parallelism final Integer o2oParallelism = inEdges.stream() - .filter(edge -> DataCommunicationPatternProperty.Value.OneToOne + .filter(edge -> DataCommunicationPatternProperty.Value.OneToOne .equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern))) .mapToInt(edge -> edge.getSrc().getProperty(ExecutionProperty.Key.Parallelism)) .max().orElse(1); diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java index a89fa6a1..e9c256c8 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java @@ -79,7 +79,7 @@ public DefaultStagePartitioningPass() { final Optional<List<IREdge>> inEdgeList = (inEdges == null || inEdges.isEmpty()) ? Optional.empty() : Optional.of(inEdges); - if (!inEdgeList.isPresent() || inEdgeList.get().size() > 1) { // If Source vertex or has multiple inEdges + if (!inEdgeList.isPresent()) { // If Source vertex createNewStage(vertex, vertexStageNumHashMap, stageNumber, vertexListForEachStage); } else { // Filter candidate incoming edges that can be included in a stage with the vertex. diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java index 696df976..fae871a7 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java @@ -186,7 +186,8 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS irEdge.getId(), irEdge.getExecutionProperties(), irEdge.getSrc(), - irEdge.getDst())); + irEdge.getDst(), + irEdge.isSideInput())); } else { // edge comes from another stage final Stage srcStage = vertexStageMap.get(srcVertex); diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java index a2c52a7f..e1a11acc 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java @@ -28,22 +28,6 @@ private final ExecutionPropertyMap executionProperties; private final Boolean isSideInput; - /** - * Constructs the edge given the below parameters. - * This constructor assumes that this edge is not for a side input. - * - * @param runtimeEdgeId the id of this edge. - * @param executionProperties to control the data flow on this edge. - * @param src the source vertex. - * @param dst the destination vertex. - */ - public RuntimeEdge(final String runtimeEdgeId, - final ExecutionPropertyMap executionProperties, - final V src, - final V dst) { - this(runtimeEdgeId, executionProperties, src, dst, false); - } - /** * Constructs the edge given the below parameters. * diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java index 2abb3b7f..61a371f5 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java @@ -50,9 +50,8 @@ final InputReader readerForParentTask, final VertexHarness child, final Map<String, Object> metricMap, - final boolean isFromSideInput, final boolean isToSideInput) { - super(dataSource, child, metricMap, isFromSideInput, isToSideInput); + super(dataSource, child, metricMap, readerForParentTask.isSideInputReader(), isToSideInput); this.readersForParentTask = readerForParentTask; this.hasFetchStarted = false; this.dataQueue = new LinkedBlockingQueue<>(); diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java index 998df63e..116b9c41 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java @@ -35,9 +35,8 @@ final Readable readable, final VertexHarness child, final Map<String, Object> metricMap, - final boolean isFromSideInput, final boolean isToSideInput) { - super(dataSource, child, metricMap, isFromSideInput, isToSideInput); + super(dataSource, child, metricMap, false, isToSideInput); this.readable = readable; } diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java index 764dddf6..f8ef425e 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java @@ -153,15 +153,14 @@ public TaskExecutor(final Task task, // Handle reads final boolean isToSideInput = isToSideInputs.stream().anyMatch(bool -> bool); if (irVertex instanceof SourceVertex) { - dataFetcherList.add(new SourceVertexDataFetcher(irVertex, sourceReader.get(), vertexHarness, metricMap, - false, isToSideInput)); // Source vertex read + dataFetcherList.add(new SourceVertexDataFetcher( + irVertex, sourceReader.get(), vertexHarness, metricMap, isToSideInput)); // Source vertex read } final List<InputReader> parentTaskReaders = getParentTaskReaders(taskIndex, irVertex, task.getTaskIncomingEdges(), dataTransferFactory); parentTaskReaders.forEach(parentTaskReader -> { - final boolean isFromSideInput = parentTaskReader.isSideInputReader(); dataFetcherList.add(new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader, - vertexHarness, metricMap, isFromSideInput, isToSideInput)); // Parent-task read + vertexHarness, metricMap, isToSideInput)); // Parent-task read }); }); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services