Repository: beam Updated Branches: refs/heads/master 1197bef19 -> b414f8de9
[BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSourceMetrics Gauge results are flaky on Jenkins, instead of asserting on value assert on the gauge's existence instead. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5dac56f7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5dac56f7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5dac56f7 Branch: refs/heads/master Commit: 5dac56f793c4851bca78dc6f4b4a70d34a016448 Parents: 1197bef Author: Aviem Zur <aviem...@gmail.com> Authored: Mon May 1 07:53:39 2017 +0300 Committer: Aviem Zur <aviem...@gmail.com> Committed: Mon May 1 10:50:23 2017 +0300 ---------------------------------------------------------------------- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 41 +++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5dac56f7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 591c099..ccbd3d6 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.kafka; -import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; @@ -99,6 +98,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Utils; import org.hamcrest.collection.IsIterableContainingInAnyOrder; +import org.hamcrest.collection.IsIterableWithSize; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -627,7 +627,6 @@ public class KafkaIOTest { MetricsFilter.builder().build()); Iterable<MetricResult<Long>> counters = metrics.counters(); - Iterable<MetricResult<GaugeResult>> gauges = metrics.gauges(); assertThat(counters, hasItem( MetricMatchers.attemptedMetricsResult( @@ -657,19 +656,31 @@ public class KafkaIOTest { readStep, 12000L))); - assertThat(gauges, hasItem( - attemptedMetricsResult( - backlogElementsOfSplit.namespace(), - backlogElementsOfSplit.name(), - readStep, - GaugeResult.create(0L, Instant.now())))); - - assertThat(gauges, hasItem( - attemptedMetricsResult( - backlogBytesOfSplit.namespace(), - backlogBytesOfSplit.name(), - readStep, - GaugeResult.create(0L, Instant.now())))); + MetricQueryResults backlogElementsMetrics = + result.metrics().queryMetrics( + MetricsFilter.builder() + .addNameFilter( + MetricNameFilter.named( + backlogElementsOfSplit.namespace(), + backlogElementsOfSplit.name())) + .build()); + + // since gauge values may be inconsistent in some environments assert only on their existence. + assertThat(backlogElementsMetrics.gauges(), + IsIterableWithSize.<MetricResult<GaugeResult>>iterableWithSize(1)); + + MetricQueryResults backlogBytesMetrics = + result.metrics().queryMetrics( + MetricsFilter.builder() + .addNameFilter( + MetricNameFilter.named( + backlogBytesOfSplit.namespace(), + backlogBytesOfSplit.name())) + .build()); + + // since gauge values may be inconsistent in some environments assert only on their existence. + assertThat(backlogBytesMetrics.gauges(), + IsIterableWithSize.<MetricResult<GaugeResult>>iterableWithSize(1)); } @Test