Repository: beam
Updated Branches:
  refs/heads/master 1197bef19 -> b414f8de9


[BEAM-2129] Fix flaky KafkaIOTest#testUnboundedSourceMetrics

Gauge results are flaky on Jenkins, instead of asserting on value
assert on the gauge's existence instead.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5dac56f7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5dac56f7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5dac56f7

Branch: refs/heads/master
Commit: 5dac56f793c4851bca78dc6f4b4a70d34a016448
Parents: 1197bef
Author: Aviem Zur <aviem...@gmail.com>
Authored: Mon May 1 07:53:39 2017 +0300
Committer: Aviem Zur <aviem...@gmail.com>
Committed: Mon May 1 10:50:23 2017 +0300

----------------------------------------------------------------------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 41 +++++++++++++-------
 1 file changed, 26 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5dac56f7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 591c099..ccbd3d6 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
-import static 
org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
@@ -99,6 +98,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Utils;
 import org.hamcrest.collection.IsIterableContainingInAnyOrder;
+import org.hamcrest.collection.IsIterableWithSize;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -627,7 +627,6 @@ public class KafkaIOTest {
         MetricsFilter.builder().build());
 
     Iterable<MetricResult<Long>> counters = metrics.counters();
-    Iterable<MetricResult<GaugeResult>> gauges = metrics.gauges();
 
     assertThat(counters, hasItem(
         MetricMatchers.attemptedMetricsResult(
@@ -657,19 +656,31 @@ public class KafkaIOTest {
             readStep,
             12000L)));
 
-    assertThat(gauges, hasItem(
-        attemptedMetricsResult(
-            backlogElementsOfSplit.namespace(),
-            backlogElementsOfSplit.name(),
-            readStep,
-            GaugeResult.create(0L, Instant.now()))));
-
-    assertThat(gauges, hasItem(
-        attemptedMetricsResult(
-            backlogBytesOfSplit.namespace(),
-            backlogBytesOfSplit.name(),
-            readStep,
-            GaugeResult.create(0L, Instant.now()))));
+    MetricQueryResults backlogElementsMetrics =
+        result.metrics().queryMetrics(
+            MetricsFilter.builder()
+                .addNameFilter(
+                    MetricNameFilter.named(
+                        backlogElementsOfSplit.namespace(),
+                        backlogElementsOfSplit.name()))
+                .build());
+
+    // since gauge values may be inconsistent in some environments assert only 
on their existence.
+    assertThat(backlogElementsMetrics.gauges(),
+        IsIterableWithSize.<MetricResult<GaugeResult>>iterableWithSize(1));
+
+    MetricQueryResults backlogBytesMetrics =
+        result.metrics().queryMetrics(
+            MetricsFilter.builder()
+                .addNameFilter(
+                    MetricNameFilter.named(
+                        backlogBytesOfSplit.namespace(),
+                        backlogBytesOfSplit.name()))
+                .build());
+
+    // since gauge values may be inconsistent in some environments assert only 
on their existence.
+    assertThat(backlogBytesMetrics.gauges(),
+        IsIterableWithSize.<MetricResult<GaugeResult>>iterableWithSize(1));
   }
 
   @Test

Reply via email to