johnyangk closed pull request #137: [NEMO-232] Implement InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 1675e9c0b..3a81df7f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,17 +235,17 @@ under the License.
                 <artifactId>maven-javadoc-plugin</artifactId>
                 <version>3.0.0</version>
                 <configuration>
-                    
<excludePackageNames>*.org.apache.nemo.runtime.common.comm</excludePackageNames>
-                    <outputDirectory>docs/apidocs</outputDirectory>
-                    <reportOutputDirectory>docs/apidocs</reportOutputDirectory>
+                  
<excludePackageNames>*.org.apache.nemo.runtime.common.comm</excludePackageNames>
+                  <outputDirectory>docs/apidocs</outputDirectory>
+                  <reportOutputDirectory>docs/apidocs</reportOutputDirectory>
                 </configuration>
                 <executions>
                     <execution>
-                        <id>aggregate</id>
-                        <goals>
-                            <goal>aggregate</goal>
-                        </goals>
-                        <phase>site</phase>
+                      <id>aggregate</id>
+                      <goals>
+                          <goal>aggregate</goal>
+                      </goals>
+                      <phase>site</phase>
                     </execution>
                     <execution>
                       <id>test-javadoc</id>
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
index ffbbe56b1..a433f3a2c 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
@@ -141,8 +141,8 @@ public void notifyMaster(final String runtimeEdgeId, final 
long srcTaskIndex) {
   /**
    * (SYNCHRONIZATION) Called by task threads.
    *
-   * @param runtimeEdge
-   * @param srcTaskIndex
+   * @param runtimeEdge runtime edge
+   * @param srcTaskIndex source task index
    * @return output contexts.
    */
   public List<ByteOutputContext> getOutputContexts(final RuntimeEdge 
runtimeEdge,
@@ -163,8 +163,8 @@ public Serializer getSerializer(final String runtimeEdgeId) 
{
   /**
    * (SYNCHRONIZATION) Called by network threads.
    *
-   * @param outputContext
-   * @throws InvalidProtocolBufferException
+   * @param outputContext output context
+   * @throws InvalidProtocolBufferException protobuf exception
    */
   public void onOutputContext(final ByteOutputContext outputContext) throws 
InvalidProtocolBufferException {
     final ControlMessage.PipeTransferContextDescriptor descriptor =
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 3f1bc9074..56c754038 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -34,6 +34,7 @@
 
   /**
    * It forwards output to the next operator.
+   * @param nextOperatorVertex next operator to emit data and watermark
    */
   public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
     this.nextOperatorVertex = nextOperatorVertex;
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
new file mode 100644
index 000000000..66fb7aa81
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.punctuation.Watermark;
+
+
+/**
+ * An interface for tracking input watermarks among multiple input streams.
+ * --edge 1--&gt;
+ * --edge 2--&gt;  watermarkManager --(emitWatermark)--&gt; nextOperator
+ * --edge 3--&gt;
+ */
+public interface InputWatermarkManager {
+
+  /**
+   * This tracks the minimum input watermark among multiple input streams.
+   * Ex)
+   * -- input stream1 (edge 1):  ---------- ts: 3 ------------------ts: 6
+   *                                                                 ^^^
+   *                                                              emit ts: 4 
(edge 2) watermark at this time
+   * -- input stream2 (edge 2):  ----------------- ts: 4------
+   *                                                 ^^^
+   *                                             emit ts: 3 (edge 1) watermark 
at this time
+   * -- input stream3 (edge 3):  ------- ts: 5 ---------------
+   * @param edgeIndex incoming edge index
+   * @param watermark watermark emitted from the edge
+   */
+  void trackAndEmitWatermarks(int edgeIndex, Watermark watermark);
+}
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
new file mode 100644
index 000000000..91c7c55c9
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This tracks the minimum input watermark among multiple input streams.
+ */
+public final class MultiInputWatermarkManager implements InputWatermarkManager 
{
+  private final List<Watermark> watermarks;
+  private final OperatorVertex nextOperator;
+  private int minWatermarkIndex;
+  public MultiInputWatermarkManager(final int numEdges,
+                                    final OperatorVertex nextOperator) {
+    super();
+    this.watermarks = new ArrayList<>(numEdges);
+    this.nextOperator = nextOperator;
+    this.minWatermarkIndex = 0;
+    // We initialize watermarks as min value because
+    // we should not emit watermark until all edges emit watermarks.
+    for (int i = 0; i < numEdges; i++) {
+      watermarks.add(new Watermark(Long.MIN_VALUE));
+    }
+  }
+
+  private int findNextMinWatermarkIndex() {
+    int index = -1;
+    long timestamp = Long.MAX_VALUE;
+    for (int i = 0; i < watermarks.size(); i++) {
+      if (watermarks.get(i).getTimestamp() < timestamp) {
+        index = i;
+        timestamp = watermarks.get(i).getTimestamp();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public void trackAndEmitWatermarks(final int edgeIndex, final Watermark 
watermark) {
+    if (edgeIndex == minWatermarkIndex) {
+      // update min watermark
+      final Watermark prevMinWatermark = watermarks.get(minWatermarkIndex);
+      watermarks.set(minWatermarkIndex, watermark);
+       // find min watermark
+      minWatermarkIndex = findNextMinWatermarkIndex();
+      final Watermark minWatermark = watermarks.get(minWatermarkIndex);
+
+      if (minWatermark.getTimestamp() < prevMinWatermark.getTimestamp()) {
+        throw new IllegalStateException(
+          "The current min watermark is ahead of prev min: " + minWatermark + 
", " + prevMinWatermark);
+      }
+
+      if (minWatermark.getTimestamp() > prevMinWatermark.getTimestamp()) {
+        // Watermark timestamp progress!
+        // Emit the min watermark
+        nextOperator.getTransform().onWatermark(minWatermark);
+      }
+    } else {
+      // The recent watermark timestamp cannot be less than the previous one
+      // because watermark is monotonically increasing.
+      if (watermarks.get(edgeIndex).getTimestamp() > watermark.getTimestamp()) 
{
+        throw new IllegalStateException(
+          "The recent watermark timestamp cannot be less than the previous one 
"
+            + "because watermark is monotonically increasing.");
+      }
+      watermarks.set(edgeIndex, watermark);
+    }
+  }
+}
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NextIntraTaskOperatorInfo.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NextIntraTaskOperatorInfo.java
new file mode 100644
index 000000000..1525261e3
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NextIntraTaskOperatorInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+
+/**
+ * Contains information for next operator:
+ * -- edgeIndex: the index of edge to next operator.
+ * -- nextOperator: next operator vertex
+ * -- watermarkManager: next operator's watermark manager
+ *
+ * ex)
+ * --edge (index 0)--&gt;
+ * --edge (index 1)--&gt;  watermarkManager --&gt; nextOperator
+ * --edge (index 2)--&gt;
+ */
+public final class NextIntraTaskOperatorInfo {
+
+  private final int edgeIndex;
+  private final OperatorVertex nextOperator;
+  private final InputWatermarkManager watermarkManager;
+
+  public NextIntraTaskOperatorInfo(final int edgeIndex,
+                                   final OperatorVertex nextOperator,
+                                   final InputWatermarkManager 
watermarkManager) {
+    this.edgeIndex = edgeIndex;
+    this.nextOperator = nextOperator;
+    this.watermarkManager = watermarkManager;
+  }
+
+  public int getEdgeIndex() {
+    return edgeIndex;
+  }
+
+  public OperatorVertex getNextOperator() {
+    return nextOperator;
+  }
+
+  public InputWatermarkManager getWatermarkManager() {
+    return watermarkManager;
+  }
+}
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 598cc35ad..3637780c1 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -41,8 +41,8 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorVertexOutputCollector.class.getName());
 
   private final IRVertex irVertex;
-  private final List<OperatorVertex> internalMainOutputs;
-  private final Map<String, List<OperatorVertex>> internalAdditionalOutputs;
+  private final List<NextIntraTaskOperatorInfo> internalMainOutputs;
+  private final Map<String, List<NextIntraTaskOperatorInfo>> 
internalAdditionalOutputs;
   private final List<OutputWriter> externalMainOutputs;
   private final Map<String, List<OutputWriter>> externalAdditionalOutputs;
 
@@ -54,11 +54,12 @@
    * @param externalMainOutputs external main outputs
    * @param externalAdditionalOutputs external additional outputs
    */
-  public OperatorVertexOutputCollector(final IRVertex irVertex,
-                                       final List<OperatorVertex> 
internalMainOutputs,
-                                       final Map<String, List<OperatorVertex>> 
internalAdditionalOutputs,
-                                       final List<OutputWriter> 
externalMainOutputs,
-                                       final Map<String, List<OutputWriter>> 
externalAdditionalOutputs) {
+  public OperatorVertexOutputCollector(
+    final IRVertex irVertex,
+    final List<NextIntraTaskOperatorInfo> internalMainOutputs,
+    final Map<String, List<NextIntraTaskOperatorInfo>> 
internalAdditionalOutputs,
+    final List<OutputWriter> externalMainOutputs,
+    final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
     this.irVertex = irVertex;
     this.internalMainOutputs = internalMainOutputs;
     this.internalAdditionalOutputs = internalAdditionalOutputs;
@@ -76,8 +77,8 @@ private void emit(final OutputWriter writer, final O output) {
 
   @Override
   public void emit(final O output) {
-    for (final OperatorVertex internalVertex : internalMainOutputs) {
-      emit(internalVertex, output);
+    for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) 
{
+      emit(internalVertex.getNextOperator(), output);
     }
 
     for (final OutputWriter externalWriter : externalMainOutputs) {
@@ -89,8 +90,8 @@ public void emit(final O output) {
   public <T> void emit(final String dstVertexId, final T output) {
 
     if (internalAdditionalOutputs.containsKey(dstVertexId)) {
-      for (final OperatorVertex internalVertex : 
internalAdditionalOutputs.get(dstVertexId)) {
-        emit(internalVertex, (O) output);
+      for (final NextIntraTaskOperatorInfo internalVertex : 
internalAdditionalOutputs.get(dstVertexId)) {
+        emit(internalVertex.getNextOperator(), (O) output);
       }
     }
 
@@ -104,15 +105,13 @@ public void emit(final O output) {
   @Override
   public void emitWatermark(final Watermark watermark) {
     // Emit watermarks to internal vertices
-    // TODO #232: Implement InputWatermarkManager
-    // TODO #232: We should emit the minimum watermark among multiple input 
streams of Transform.
-    for (final OperatorVertex internalVertex : internalMainOutputs) {
-      internalVertex.getTransform().onWatermark(watermark);
+    for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) 
{
+      
internalVertex.getWatermarkManager().trackAndEmitWatermarks(internalVertex.getEdgeIndex(),
 watermark);
     }
 
-    for (final List<OperatorVertex> internalVertices : 
internalAdditionalOutputs.values()) {
-      for (final OperatorVertex internalVertex : internalVertices) {
-        internalVertex.getTransform().onWatermark(watermark);
+    for (final List<NextIntraTaskOperatorInfo> internalVertices : 
internalAdditionalOutputs.values()) {
+      for (final NextIntraTaskOperatorInfo internalVertex : internalVertices) {
+        
internalVertex.getWatermarkManager().trackAndEmitWatermarks(internalVertex.getEdgeIndex(),
 watermark);
       }
     }
 
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
new file mode 100644
index 000000000..204bf22c2
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+
+/**
+ * This is a special implementation for single input data stream for 
optimization.
+ */
+public final class SingleInputWatermarkManager implements 
InputWatermarkManager {
+
+  private final OperatorVertex nextOperator;
+
+  public SingleInputWatermarkManager(final OperatorVertex nextOperator) {
+    this.nextOperator = nextOperator;
+  }
+
+  /**
+   * This just forwards watermarks to the next operator because it has one 
data stream.
+   * @param edgeIndex edge index
+   * @param watermark watermark
+   */
+  @Override
+  public void trackAndEmitWatermarks(final int edgeIndex,
+                                     final Watermark watermark) {
+    nextOperator.getTransform().onWatermark(watermark);
+  }
+}
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 1541792b9..8c924431b 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -28,6 +28,7 @@
 import org.apache.nemo.common.ir.vertex.*;
 import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import 
org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
@@ -154,6 +155,37 @@ public TaskExecutor(final Task task,
     // Traverse in a reverse-topological order to ensure that each visited 
vertex's children vertices exist.
     final List<IRVertex> reverseTopologicallySorted = 
Lists.reverse(irVertexDag.getTopologicalSort());
 
+    // Build a map for edge as a key and edge index as a value
+    // This variable is used for creating NextIntraTaskOperatorInfo
+    // in {@link this#getInternalMainOutputs and this#internalMainOutputs}
+    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap = new HashMap<>();
+    reverseTopologicallySorted.forEach(childVertex -> {
+      final List<RuntimeEdge<IRVertex>> edges = 
irVertexDag.getIncomingEdgesOf(childVertex);
+      for (int edgeIndex = 0; edgeIndex < edges.size(); edgeIndex++) {
+        final RuntimeEdge<IRVertex> edge = edges.get(edgeIndex);
+        edgeIndexMap.putIfAbsent(edge, edgeIndex);
+      }
+    });
+
+    // Build a map for InputWatermarkManager for each operator vertex
+    // This variable is used for creating NextIntraTaskOperatorInfo
+    // in {@link this#getInternalMainOutputs and this#internalMainOutputs}
+    final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap = 
new HashMap<>();
+    reverseTopologicallySorted.forEach(childVertex -> {
+
+      if (childVertex instanceof OperatorVertex) {
+        final List<RuntimeEdge<IRVertex>> edges = 
irVertexDag.getIncomingEdgesOf(childVertex);
+        if (edges.size() == 1) {
+          operatorWatermarkManagerMap.putIfAbsent(childVertex,
+            new SingleInputWatermarkManager((OperatorVertex) childVertex));
+        } else {
+          operatorWatermarkManagerMap.putIfAbsent(childVertex,
+            new MultiInputWatermarkManager(edges.size(), (OperatorVertex) 
childVertex));
+        }
+      }
+
+    });
+
     // Create a harness for each vertex
     final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
     final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
@@ -165,13 +197,14 @@ public TaskExecutor(final Task task,
       }
 
       // Additional outputs
-      final Map<String, List<OperatorVertex>> internalAdditionalOutputMap =
-        getInternalAdditionalOutputMap(irVertex, irVertexDag);
+      final Map<String, List<NextIntraTaskOperatorInfo>> 
internalAdditionalOutputMap =
+        getInternalAdditionalOutputMap(irVertex, irVertexDag, edgeIndexMap, 
operatorWatermarkManagerMap);
       final Map<String, List<OutputWriter>> externalAdditionalOutputMap =
         getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), 
intermediateDataIOFactory);
 
       // Main outputs
-      final List<OperatorVertex> internalMainOutputs = 
getInternalMainOutputs(irVertex, irVertexDag);
+      final List<NextIntraTaskOperatorInfo> internalMainOutputs =
+        getInternalMainOutputs(irVertex, irVertexDag, edgeIndexMap, 
operatorWatermarkManagerMap);
       final List<OutputWriter> externalMainOutputs =
         getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), 
intermediateDataIOFactory);
 
@@ -492,17 +525,25 @@ private boolean handleDataFetchers(final 
List<DataFetcher> fetchers) {
     return map;
   }
 
-  private Map<String, List<OperatorVertex>> getInternalAdditionalOutputMap(
+  // TODO #253: Refactor getInternal(Main/Additional)OutputMap
+  private Map<String, List<NextIntraTaskOperatorInfo>> 
getInternalAdditionalOutputMap(
     final IRVertex irVertex,
-    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
+    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+    final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
     // Add all intra-task additional tags to additional output map.
-    final Map<String, List<OperatorVertex>> map = new HashMap<>();
+    final Map<String, List<NextIntraTaskOperatorInfo>> map = new HashMap<>();
 
     irVertexDag.getOutgoingEdgesOf(irVertex.getId())
       .stream()
       .filter(edge -> 
edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
-      .map(edge ->
-        
Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), 
(OperatorVertex) edge.getDst()))
+      .map(edge -> {
+          final String outputTag = 
edge.getPropertyValue(AdditionalOutputTagProperty.class).get();
+          final int index = edgeIndexMap.get(edge);
+          final OperatorVertex nextOperator = (OperatorVertex) edge.getDst();
+          final InputWatermarkManager inputWatermarkManager = 
operatorWatermarkManagerMap.get(nextOperator);
+          return Pair.of(outputTag, new NextIntraTaskOperatorInfo(index, 
nextOperator, inputWatermarkManager));
+        })
       .forEach(pair -> {
         map.putIfAbsent(pair.left(), new ArrayList<>());
         map.get(pair.left()).add(pair.right());
@@ -511,12 +552,22 @@ private boolean handleDataFetchers(final 
List<DataFetcher> fetchers) {
     return map;
   }
 
-  private List<OperatorVertex> getInternalMainOutputs(final IRVertex irVertex,
-                                                      final DAG<IRVertex, 
RuntimeEdge<IRVertex>> irVertexDag) {
+  // TODO #253: Refactor getInternal(Main/Additional)OutputMap
+  private List<NextIntraTaskOperatorInfo> getInternalMainOutputs(
+    final IRVertex irVertex,
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
+    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+    final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
+
     return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
       .stream()
       .filter(edge -> 
!edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
-      .map(edge -> (OperatorVertex) edge.getDst())
+      .map(edge -> {
+        final int index = edgeIndexMap.get(edge);
+        final OperatorVertex nextOperator = (OperatorVertex) edge.getDst();
+        final InputWatermarkManager inputWatermarkManager = 
operatorWatermarkManagerMap.get(nextOperator);
+        return new NextIntraTaskOperatorInfo(index, nextOperator, 
inputWatermarkManager);
+      })
       .collect(Collectors.toList());
   }
 
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
new file mode 100644
index 000000000..9303da832
--- /dev/null
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import java.util.LinkedList;
+import java.util.List;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+
+public final class InputWatermarkManagerTest {
+
+  @Test
+  public void test() {
+    final List<Watermark> emittedWatermarks = new LinkedList<>();
+    final Transform transform = mock(Transform.class);
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+        final Watermark watermark = invocationOnMock.getArgument(0);
+        emittedWatermarks.add(watermark);
+        return null;
+      }
+    }).when(transform).onWatermark(any(Watermark.class));
+
+    final OperatorVertex operatorVertex = new OperatorVertex(transform);
+    final InputWatermarkManager watermarkManager =
+      new MultiInputWatermarkManager(3, operatorVertex);
+
+    //edge1: 10 s
+    //edge2: 5 s
+    //edge3: 8 s
+    //current min watermark: 5 s
+    watermarkManager.trackAndEmitWatermarks(0, new Watermark(10));
+    assertEquals(0, emittedWatermarks.size());
+    watermarkManager.trackAndEmitWatermarks(1, new Watermark(5));
+    assertEquals(0, emittedWatermarks.size());
+    watermarkManager.trackAndEmitWatermarks(2, new Watermark(8));
+    assertEquals(5, emittedWatermarks.get(0).getTimestamp());
+    emittedWatermarks.clear();
+
+    //edge1: 13
+    //edge2: 9
+    //edge3: 8
+    //current min watermark: 8
+    watermarkManager.trackAndEmitWatermarks(0, new Watermark(13));
+    assertEquals(0, emittedWatermarks.size());
+    watermarkManager.trackAndEmitWatermarks(1, new Watermark(9));
+    assertEquals(8, emittedWatermarks.get(0).getTimestamp());
+    emittedWatermarks.clear();
+
+    //edge1: 13
+    //edge2: 15
+    //edge3: 8
+    //current min watermark: 8
+    watermarkManager.trackAndEmitWatermarks(1, new Watermark(15));
+    assertEquals(0, emittedWatermarks.size());
+
+    //edge1: 13
+    //edge2: 15
+    //edge3: 17
+    //current min watermark: 13
+    watermarkManager.trackAndEmitWatermarks(2, new Watermark(17));
+    assertEquals(13, emittedWatermarks.get(0).getTimestamp());
+  }
+}
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 823366323..e05bbfb81 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -61,7 +61,10 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -192,78 +195,11 @@ public void close() throws IOException {
    */
   @Test()
   public void testUnboundedSourceVertexDataFetching() throws Exception {
-    final IRVertex sourceIRVertex = new SourceVertex() {
-      @Override
-      public IRVertex getClone() {
-        return this;
-      }
-
-      @Override
-      public boolean isBounded() {
-        return false;
-      }
-
-      @Override
-      public List<Readable> getReadables(int desiredNumOfSplits) throws 
Exception {
-        return null;
-      }
-
-      @Override
-      public void clearInternalStates() {
-
-      }
-    };
-
-    final long watermark = 1234567L;
-
-    final Readable readable = new Readable() {
-      int pointer = 0;
-      final int middle = elements.size() / 2;
-      final int end = elements.size();
-      boolean watermarkEmitted = false;
-
-      @Override
-      public void prepare() {
-
-      }
-
-      // This emulates unbounded source that throws NoSuchElementException
-      // It reads current data until middle point and  throws 
NoSuchElementException at the middle point.
-      // It resumes the data reading after emitting a watermark, and finishes 
at the end of the data.
-      @Override
-      public Object readCurrent() throws NoSuchElementException {
-        if (pointer == middle && !watermarkEmitted) {
-          throw new NoSuchElementException();
-        }
-
-        return elements.get(pointer);
-      }
-
-      @Override
-      public void advance() throws IOException {
-        pointer += 1;
-      }
-
-      @Override
-      public long readWatermark() {
-        watermarkEmitted = true;
-        return watermark;
-      }
-
-      @Override
-      public boolean isFinished() {
-        return pointer == end;
-      }
-
-      @Override
-      public List<String> getLocations() throws Exception {
-        return null;
-      }
-
-      @Override
-      public void close() throws IOException {
-      }
-    };
+    final IRVertex sourceIRVertex = new TestUnboundedSourceVertex();
+    final Long watermark = 1234567L;
+    final BlockingQueue<Long> watermarkQueue = new LinkedBlockingQueue<>();
+    watermarkQueue.add(watermark);
+    final Readable readable = new TestUnboundedSourceReadable(watermarkQueue, 
1);
 
     final Map<String, Readable> vertexIdToReadable = new HashMap<>();
     vertexIdToReadable.put(sourceIRVertex.getId(), readable);
@@ -330,6 +266,108 @@ public void testParentTaskDataFetching() throws Exception 
{
     assertTrue(checkEqualElements(elements, 
runtimeEdgeToOutputData.get(taskOutEdge.getId())));
   }
 
+  private void waitUntilWatermarkEmitted(final Queue<Long> watermarkQueue) {
+    while (!watermarkQueue.isEmpty()) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * The DAG of the task to test will looks like:
+   * source1 -> vertex1 -> vertex2
+   * source2 -> vertex3 ->
+   *
+   * The vertex2 has two incoming edges (from vertex1 and vertex3)
+   * and we test if TaskExecutor handles data and watermarks correctly in this 
situation.
+   *
+   * source1 emits watermarks:     500 (ts)  600 (ts)   1400 (ts)  1800 (ts)   
     2500 (ts)
+   * source2 emits watermarks:  1000(ts)                                     
2200 (ts)
+   *
+   * The vertex2 should receive and emits watermarks 500, 600, 1000, 1800, and 
2200
+   */
+  @Test()
+  public void testMultipleIncomingEdges() throws Exception {
+    final List<Watermark> emittedWatermarks = new ArrayList<>();
+    final IRVertex operatorIRVertex1 = new OperatorVertex(new 
RelayTransform());
+    final IRVertex operatorIRVertex2 = new OperatorVertex(new 
RelayTransformNoWatermarkEmit(emittedWatermarks));
+    final IRVertex operatorIRVertex3 = new OperatorVertex(new 
RelayTransform());
+
+    final IRVertex sourceIRVertex1 = new TestUnboundedSourceVertex();
+    final IRVertex sourceIRVertex2 = new TestUnboundedSourceVertex();
+
+    final Queue<Long> watermarks1 = new ConcurrentLinkedQueue<>();
+    watermarks1.add(500L);
+    final Queue<Long> watermarks2 = new ConcurrentLinkedQueue<>();
+    watermarks2.add(1000L);
+    final Readable readable1 = new TestUnboundedSourceReadable(watermarks1, 5);
+    final Readable readable2 = new TestUnboundedSourceReadable(watermarks2, 2);
+
+    final Map<String, Readable> vertexIdToReadable = new HashMap<>();
+    vertexIdToReadable.put(sourceIRVertex1.getId(), readable1);
+    vertexIdToReadable.put(sourceIRVertex2.getId(), readable2);
+
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
+      new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
+        .addVertex(sourceIRVertex1)
+        .addVertex(sourceIRVertex2)
+        .addVertex(operatorIRVertex1)
+        .addVertex(operatorIRVertex2)
+        .addVertex(operatorIRVertex3)
+        .connectVertices(createEdge(sourceIRVertex1, operatorIRVertex1, 
"edge1"))
+        .connectVertices(createEdge(operatorIRVertex1, operatorIRVertex2, 
"edge2"))
+        .connectVertices(createEdge(sourceIRVertex2, operatorIRVertex3, 
"edge3"))
+        .connectVertices(createEdge(operatorIRVertex3, operatorIRVertex2, 
"edge4"))
+        .buildWithoutSourceSinkCheck();
+
+    final StageEdge taskOutEdge = mockStageEdgeFrom(operatorIRVertex2);
+    final Task task =
+      new Task(
+        "testSourceVertexDataFetching",
+        generateTaskId(),
+        TASK_EXECUTION_PROPERTY_MAP,
+        new byte[0],
+        Collections.emptyList(),
+        Collections.singletonList(taskOutEdge),
+        vertexIdToReadable);
+
+    // Execute the task.
+    final Thread watermarkEmitThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        waitUntilWatermarkEmitted(watermarks2);
+        watermarks1.add(600L);
+        watermarks1.add(1400L);
+        watermarks1.add(1800L);
+        waitUntilWatermarkEmitted(watermarks1);
+        watermarks2.add(2200L);
+        waitUntilWatermarkEmitted(watermarks2);
+        watermarks1.add(2500L);
+        waitUntilWatermarkEmitted(watermarks1);
+      }
+    });
+
+    watermarkEmitThread.start();
+    final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
+    taskExecutor.execute();
+
+    watermarkEmitThread.join();
+
+    // Check whether the watermark is emitted
+    assertEquals(Arrays.asList(
+      new Watermark(500), new Watermark(600), new Watermark(1000),
+      new Watermark(1800), new Watermark(2200)), emittedWatermarks);
+
+    // Check the output.
+    final List<Integer> doubledElements = new ArrayList<>(elements.size()*2);
+    doubledElements.addAll(elements);
+    doubledElements.addAll(elements);
+    assertTrue(checkEqualElements(doubledElements, 
runtimeEdgeToOutputData.get(taskOutEdge.getId())));
+  }
+
   /**
    * The DAG of the task to test will looks like:
    * parent task -> task (vertex 1 -> task 2) -> child task
@@ -601,6 +639,98 @@ public void close() {
     }
   }
 
+  /**
+   * Source vertex for unbounded source test.
+   */
+  private final class TestUnboundedSourceVertex extends SourceVertex {
+
+    @Override
+    public boolean isBounded() {
+      return false;
+    }
+
+    @Override
+    public List<Readable> getReadables(int desiredNumOfSplits) throws 
Exception {
+      return null;
+    }
+
+    @Override
+    public void clearInternalStates() {
+
+    }
+
+    @Override
+    public IRVertex getClone() {
+      return null;
+    }
+  }
+
+
+  // This emulates unbounded source that throws NoSuchElementException
+  // It reads current data until middle point and throws 
NoSuchElementException at the middle point.
+  // It resumes the data reading after emitting a watermark, and finishes at 
the end of the data.
+  private final class TestUnboundedSourceReadable implements Readable {
+    int pointer = 0;
+    final int middle = elements.size() / 2;
+    final int end = elements.size();
+    final Queue<Long> watermarks;
+    int numEmittedWatermarks = 0;
+    final int expectedNumWatermarks;
+    long currWatermark = -1;
+
+    public TestUnboundedSourceReadable(final Queue<Long> watermarks,
+                                       final int expectedNumWatermarks) {
+      this.watermarks = watermarks;
+      this.expectedNumWatermarks = expectedNumWatermarks;
+    }
+
+    @Override
+    public void prepare() {
+
+    }
+
+    @Override
+    public Object readCurrent() throws NoSuchElementException {
+      if (pointer == middle && numEmittedWatermarks < expectedNumWatermarks) {
+        throw new NoSuchElementException();
+      }
+      return elements.get(pointer);
+    }
+
+    @Override
+    public void advance() throws IOException {
+      pointer += 1;
+    }
+
+    @Override
+    public long readWatermark() {
+      if (numEmittedWatermarks >= expectedNumWatermarks) {
+        return Long.MAX_VALUE;
+      }
+
+      final Long watermark = watermarks.poll();
+      if (watermark == null) {
+        return currWatermark;
+      }
+      currWatermark = watermark;
+      numEmittedWatermarks += 1;
+      return watermark;
+    }
+
+    @Override
+    public boolean isFinished() {
+      return pointer == end;
+    }
+
+    @Override
+    public List<String> getLocations() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+  }
 
   /**
    * Simple identity function for testing.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to