Change counter name in TestDataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b055d2d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b055d2d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b055d2d Branch: refs/heads/gearpump-runner Commit: 6b055d2debe879816808b4c1ee847e34cc1df5c0 Parents: 1ee191f Author: Joshua Litt <joshual...@google.com> Authored: Sat Dec 17 11:12:12 2016 -0800 Committer: Joshua Litt <joshual...@google.com> Committed: Sat Dec 17 11:12:12 2016 -0800 ---------------------------------------------------------------------- .../dataflow/testing/TestDataflowRunner.java | 29 ++++++++++++++++---- .../testing/TestDataflowRunnerTest.java | 16 ++++++++++- 2 files changed, 39 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b055d2d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index 4b0fcf2..0564448 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -61,7 +61,12 @@ import org.slf4j.LoggerFactory; */ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { private static final String TENTATIVE_COUNTER = "tentative"; - private static final String WATERMARK_METRIC_SUFFIX = "windmill-data-watermark"; + // See https://issues.apache.org/jira/browse/BEAM-1170 + // we need to either fix the API or pipe the DRAINED signal through + @VisibleForTesting + static final String LEGACY_WATERMARK_METRIC_SUFFIX = "windmill-data-watermark"; + @VisibleForTesting + static final String WATERMARK_METRIC_SUFFIX = "DataWatermark"; private static final long MAX_WATERMARK_VALUE = -2L; private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class); @@ -248,6 +253,23 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** + * Checks wether a metric is a streaming watermark. + * + * @return true if the metric is a watermark. + */ + boolean isWatermark(MetricUpdate metric) { + if (metric.getName() == null || metric.getName().getName() == null) { + return false; // no name -> shouldn't happen, not the watermark + } + if (metric.getScalar() == null) { + return false; // no scalar value -> not the watermark + } + String name = metric.getName().getName(); + return name.endsWith(LEGACY_WATERMARK_METRIC_SUFFIX) + || name.endsWith(WATERMARK_METRIC_SUFFIX); + } + + /** * Check watermarks of the streaming job. At least one watermark metric must exist. * * @return true if all watermarks are at max, false otherwise. @@ -256,10 +278,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { boolean atMaxWatermark(DataflowPipelineJob job, JobMetrics metrics) { boolean hasMaxWatermark = false; for (MetricUpdate metric : metrics.getMetrics()) { - if (metric.getName() == null - || metric.getName().getName() == null - || !metric.getName().getName().endsWith(WATERMARK_METRIC_SUFFIX) - || metric.getScalar() == null) { + if (!isWatermark(metric)) { continue; } BigDecimal watermark = (BigDecimal) metric.getScalar(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b055d2d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index 366c6a1..da5630b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.testing; +import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.LEGACY_WATERMARK_METRIC_SUFFIX; +import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.WATERMARK_METRIC_SUFFIX; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -95,7 +97,6 @@ public class TestDataflowRunnerTest { @Mock private MockLowLevelHttpRequest request; @Mock private GcsUtil mockGcsUtil; - private static final String WATERMARK_METRIC_SUFFIX = "windmill-data-watermark"; private static final BigDecimal DEFAULT_MAX_WATERMARK = new BigDecimal(-2); private TestDataflowPipelineOptions options; @@ -411,6 +412,19 @@ public class TestDataflowRunnerTest { } @Test + public void testCheckMaxWatermarkWithLegacyWatermarkAtMax() throws IOException { + DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( + ImmutableMap.of(LEGACY_WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); + doReturn(State.RUNNING).when(job).getState(); + assertTrue(runner.atMaxWatermark(job, metrics)); + } + + @Test public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); Pipeline p = TestPipeline.create(options);