johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229599612
 
 

 ##########
 File path: 
runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 ##########
 @@ -601,6 +600,91 @@ public void close() {
     }
   }
 
+  /**
+   * Source vertex for unbounded source test.
+   */
+  private final class TestUnboundedSourceVertex extends SourceVertex {
+
+    @Override
+    public boolean isBounded() {
+      return false;
+    }
+
+    @Override
+    public List<Readable> getReadables(int desiredNumOfSplits) throws 
Exception {
+      return null;
+    }
+
+    @Override
+    public void clearInternalStates() {
+
+    }
+
+    @Override
+    public IRVertex getClone() {
+      return null;
+    }
+  }
+
+  private final class TestUnboundedSourceReadable implements Readable {
+    int pointer = 0;
+    final int middle = elements.size() / 2;
+    final int end = elements.size();
+    boolean watermarkEmitted = false;
+    final List<Long> watermarks;
+    int numEmittedWatermarks = 0;
+
+    public TestUnboundedSourceReadable(final List<Long> watermarks) {
+      this.watermarks = watermarks;
+    }
+
+    @Override
+    public void prepare() {
+
+    }
+
+    // This emulates unbounded source that throws NoSuchElementException
+    // It reads current data until middle point and  throws 
NoSuchElementException at the middle point.
+    // It resumes the data reading after emitting a watermark, and finishes at 
the end of the data.
+    @Override
+    public Object readCurrent() throws NoSuchElementException {
+      if (pointer == middle && !watermarkEmitted) {
 
 Review comment:
   Can you explain a little bit more in detail (in the comment) why `! 
watermarkEmitted` is needed here?
   `readCurrent` and watermark seem independent operations to me.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to