Change counter name in TestDataflowRunner

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a2f44923
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a2f44923
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a2f44923

Branch: refs/heads/gearpump-runner
Commit: a2f44923c8a8db533ad0bb6f545a96cf5007cfa5
Parents: d13f11f
Author: Joshua Litt <joshual...@google.com>
Authored: Sat Dec 17 11:12:12 2016 -0800
Committer: bchambers <bchamb...@google.com>
Committed: Mon Dec 19 12:28:19 2016 -0800

----------------------------------------------------------------------
 .../dataflow/testing/TestDataflowRunner.java    | 29 ++++++++++++++++----
 .../testing/TestDataflowRunnerTest.java         | 16 ++++++++++-
 2 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a2f44923/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index 4b0fcf2..0564448 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -61,7 +61,12 @@ import org.slf4j.LoggerFactory;
  */
 public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   private static final String TENTATIVE_COUNTER = "tentative";
-  private static final String WATERMARK_METRIC_SUFFIX = 
"windmill-data-watermark";
+  // See https://issues.apache.org/jira/browse/BEAM-1170
+  // we need to either fix the API or pipe the DRAINED signal through
+  @VisibleForTesting
+  static final String LEGACY_WATERMARK_METRIC_SUFFIX = 
"windmill-data-watermark";
+  @VisibleForTesting
+  static final String WATERMARK_METRIC_SUFFIX = "DataWatermark";
   private static final long MAX_WATERMARK_VALUE = -2L;
   private static final Logger LOG = 
LoggerFactory.getLogger(TestDataflowRunner.class);
 
@@ -248,6 +253,23 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
+   * Checks wether a metric is a streaming watermark.
+   *
+   * @return true if the metric is a watermark.
+   */
+  boolean isWatermark(MetricUpdate metric) {
+    if (metric.getName() == null || metric.getName().getName() == null) {
+      return false; // no name -> shouldn't happen, not the watermark
+    }
+    if (metric.getScalar() == null) {
+      return false; // no scalar value -> not the watermark
+    }
+    String name = metric.getName().getName();
+    return name.endsWith(LEGACY_WATERMARK_METRIC_SUFFIX)
+        || name.endsWith(WATERMARK_METRIC_SUFFIX);
+  }
+
+  /**
    * Check watermarks of the streaming job. At least one watermark metric must 
exist.
    *
    * @return true if all watermarks are at max, false otherwise.
@@ -256,10 +278,7 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   boolean atMaxWatermark(DataflowPipelineJob job, JobMetrics metrics) {
     boolean hasMaxWatermark = false;
     for (MetricUpdate metric : metrics.getMetrics()) {
-      if (metric.getName() == null
-          || metric.getName().getName() == null
-          || !metric.getName().getName().endsWith(WATERMARK_METRIC_SUFFIX)
-          || metric.getScalar() == null) {
+      if (!isWatermark(metric)) {
         continue;
       }
       BigDecimal watermark = (BigDecimal) metric.getScalar();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a2f44923/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 366c6a1..da5630b 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.dataflow.testing;
 
+import static 
org.apache.beam.runners.dataflow.testing.TestDataflowRunner.LEGACY_WATERMARK_METRIC_SUFFIX;
+import static 
org.apache.beam.runners.dataflow.testing.TestDataflowRunner.WATERMARK_METRIC_SUFFIX;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -95,7 +97,6 @@ public class TestDataflowRunnerTest {
   @Mock private MockLowLevelHttpRequest request;
   @Mock private GcsUtil mockGcsUtil;
 
-  private static final String WATERMARK_METRIC_SUFFIX = 
"windmill-data-watermark";
   private static final BigDecimal DEFAULT_MAX_WATERMARK = new BigDecimal(-2);
 
   private TestDataflowPipelineOptions options;
@@ -411,6 +412,19 @@ public class TestDataflowRunnerTest {
   }
 
   @Test
+  public void testCheckMaxWatermarkWithLegacyWatermarkAtMax() throws 
IOException {
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
+        ImmutableMap.of(LEGACY_WATERMARK_METRIC_SUFFIX, 
DEFAULT_MAX_WATERMARK)));
+    doReturn(State.RUNNING).when(job).getState();
+    assertTrue(runner.atMaxWatermark(job, metrics));
+  }
+
+  @Test
   public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws 
IOException {
     DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
     Pipeline p = TestPipeline.create(options);

Reply via email to