jeongyooneo closed pull request #38: [NEMO-48] Do not merge broadcasted data 
into a single partition
URL: https://github.com/apache/incubator-nemo/pull/38
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java 
b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
index 3b586374..2d90388d 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
@@ -16,8 +16,6 @@
 package edu.snu.nemo.common.dag;
 
 import edu.snu.nemo.common.ir.edge.IREdge;
-import 
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-//import 
edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
@@ -243,15 +241,6 @@ private void sinkCheck() {
    * Helper method to check that all execution properties are correct and 
makes sense.
    */
   private void executionPropertyCheck() {
-    // SideInput edge must be broadcast
-    vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e 
instanceof IREdge).map(e -> (IREdge) e)
-        .filter(e -> Boolean.TRUE.equals(e.isSideInput()))
-        .filter(e -> 
!(e.getProperty(ExecutionProperty.Key.DataCommunicationPattern))
-            .equals(DataCommunicationPatternProperty.Value.BroadCast))
-        .forEach(e -> {
-          throw new RuntimeException("DAG execution property check: "
-              + "SideInput edge must be broadcast: " + e.getId());
-        }));
     // SideInput is not compatible with Push
     vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e 
instanceof IREdge).map(e -> (IREdge) e)
         .filter(e -> Boolean.TRUE.equals(e.isSideInput()))
diff --git 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
index e3072855..bf3253be 100644
--- 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
+++ 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
@@ -263,7 +263,7 @@ private static BeamCoder getCoderForView(final ViewFn 
viewFn, final Coder beamIn
     if (dstTransform instanceof GroupByKeyTransform) {
       return DataCommunicationPatternProperty.Value.Shuffle;
     }
-    if (srcTransform instanceof CreateViewTransform || dstTransform instanceof 
CreateViewTransform) {
+    if (dstTransform instanceof CreateViewTransform) {
       return DataCommunicationPatternProperty.Value.BroadCast;
     }
     return DataCommunicationPatternProperty.Value.OneToOne;
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
index adf5218b..6b90111c 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
@@ -78,7 +78,7 @@ public DefaultParallelismPass(final int 
desiredSourceParallelism,
           // No reason to propagate via Broadcast edges, as the data streams 
that will use the broadcasted data
           // as a sideInput will have their own number of parallelism
           final Integer o2oParallelism = inEdges.stream()
-             .filter(edge -> DataCommunicationPatternProperty.Value.OneToOne
+              .filter(edge -> DataCommunicationPatternProperty.Value.OneToOne
                   
.equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)))
               .mapToInt(edge -> 
edge.getSrc().getProperty(ExecutionProperty.Key.Parallelism))
               .max().orElse(1);
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
index a89fa6a1..e9c256c8 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
@@ -79,7 +79,7 @@ public DefaultStagePartitioningPass() {
       final Optional<List<IREdge>> inEdgeList = (inEdges == null || 
inEdges.isEmpty())
               ? Optional.empty() : Optional.of(inEdges);
 
-      if (!inEdgeList.isPresent() || inEdgeList.get().size() > 1) { // If 
Source vertex or has multiple inEdges
+      if (!inEdgeList.isPresent()) { // If Source vertex
         createNewStage(vertex, vertexStageNumHashMap, stageNumber, 
vertexListForEachStage);
       } else {
         // Filter candidate incoming edges that can be included in a stage 
with the vertex.
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 696df976..fae871a7 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -186,7 +186,8 @@ private void handleDuplicateEdgeGroupProperty(final 
DAG<Stage, StageEdge> dagOfS
                 irEdge.getId(),
                 irEdge.getExecutionProperties(),
                 irEdge.getSrc(),
-                irEdge.getDst()));
+                irEdge.getDst(),
+                irEdge.isSideInput()));
           } else { // edge comes from another stage
             final Stage srcStage = vertexStageMap.get(srcVertex);
 
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
index a2c52a7f..e1a11acc 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
@@ -28,22 +28,6 @@
   private final ExecutionPropertyMap executionProperties;
   private final Boolean isSideInput;
 
-  /**
-   * Constructs the edge given the below parameters.
-   * This constructor assumes that this edge is not for a side input.
-   *
-   * @param runtimeEdgeId       the id of this edge.
-   * @param executionProperties to control the data flow on this edge.
-   * @param src                 the source vertex.
-   * @param dst                 the destination vertex.
-   */
-  public RuntimeEdge(final String runtimeEdgeId,
-                     final ExecutionPropertyMap executionProperties,
-                     final V src,
-                     final V dst) {
-    this(runtimeEdgeId, executionProperties, src, dst, false);
-  }
-
   /**
    * Constructs the edge given the below parameters.
    *
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 2abb3b7f..61a371f5 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -50,9 +50,8 @@
                         final InputReader readerForParentTask,
                         final VertexHarness child,
                         final Map<String, Object> metricMap,
-                        final boolean isFromSideInput,
                         final boolean isToSideInput) {
-    super(dataSource, child, metricMap, isFromSideInput, isToSideInput);
+    super(dataSource, child, metricMap, 
readerForParentTask.isSideInputReader(), isToSideInput);
     this.readersForParentTask = readerForParentTask;
     this.hasFetchStarted = false;
     this.dataQueue = new LinkedBlockingQueue<>();
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 998df63e..116b9c41 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -35,9 +35,8 @@
                           final Readable readable,
                           final VertexHarness child,
                           final Map<String, Object> metricMap,
-                          final boolean isFromSideInput,
                           final boolean isToSideInput) {
-    super(dataSource, child, metricMap, isFromSideInput, isToSideInput);
+    super(dataSource, child, metricMap, false, isToSideInput);
     this.readable = readable;
   }
 
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 764dddf6..f8ef425e 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -153,15 +153,14 @@ public TaskExecutor(final Task task,
       // Handle reads
       final boolean isToSideInput = isToSideInputs.stream().anyMatch(bool -> 
bool);
       if (irVertex instanceof SourceVertex) {
-        dataFetcherList.add(new SourceVertexDataFetcher(irVertex, 
sourceReader.get(), vertexHarness, metricMap,
-            false, isToSideInput)); // Source vertex read
+        dataFetcherList.add(new SourceVertexDataFetcher(
+            irVertex, sourceReader.get(), vertexHarness, metricMap, 
isToSideInput)); // Source vertex read
       }
       final List<InputReader> parentTaskReaders =
           getParentTaskReaders(taskIndex, irVertex, 
task.getTaskIncomingEdges(), dataTransferFactory);
       parentTaskReaders.forEach(parentTaskReader -> {
-        final boolean isFromSideInput = parentTaskReader.isSideInputReader();
         dataFetcherList.add(new 
ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
-            vertexHarness, metricMap, isFromSideInput, isToSideInput)); // 
Parent-task read
+            vertexHarness, metricMap, isToSideInput)); // Parent-task read
       });
     });
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to