Repository: kafka Updated Branches: refs/heads/trunk 6cf2cb6f2 -> eb59c8124
MINOR: fix transient QueryableStateIntegration test failure The verification in verifyGreaterOrEqual was incorrect. It was failing when a new key was found. Set the TimeWindow to a large value so all windowed results fall in a single window Author: Damian Guy <damian....@gmail.com> Reviewers: Guozhang Wang <wangg...@gmail.com> Closes #1833 from dguy/minor-test-fix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb59c812 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb59c812 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb59c812 Branch: refs/heads/trunk Commit: eb59c8124e41fce176bfa6087d5aa4dce18f3642 Parents: 6cf2cb6 Author: Damian Guy <damian....@gmail.com> Authored: Thu Sep 8 10:53:29 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Thu Sep 8 10:53:29 2016 -0700 ---------------------------------------------------------------------- .../QueryableStateIntegrationTest.java | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/eb59c812/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 496644f..63bb081 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -62,6 +62,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; @@ -82,7 +83,8 @@ public class QueryableStateIntegrationTest { private static final String STREAM_THREE = "stream-three"; private static final int NUM_PARTITIONS = NUM_BROKERS; private static final int NUM_REPLICAS = NUM_BROKERS; - private static final long WINDOW_SIZE = 60000L; + // sufficiently large window size such that everything falls into 1 window + private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS); private static final String OUTPUT_TOPIC_THREE = "output-three"; private Properties streamsConfiguration; private List<String> inputValues; @@ -582,10 +584,8 @@ public class QueryableStateIntegrationTest { final Long value = keyValueStore.get(key); if (value != null) { countState.put(key, value); - } else { - if (failIfKeyNotFound) { - fail("Key not found " + key); - } + } else if (failIfKeyNotFound) { + fail("Key not found " + key); } } @@ -593,10 +593,6 @@ public class QueryableStateIntegrationTest { if (expectedWindowedCount.containsKey(actualWindowStateEntry.getKey())) { final Long expectedValue = expectedWindowedCount.get(actualWindowStateEntry.getKey()); assertTrue(actualWindowStateEntry.getValue() >= expectedValue); - } else { - if (failIfKeyNotFound) { - fail("Key not found " + actualWindowStateEntry.getKey()); - } } // return this for next round of comparisons expectedWindowedCount.put(actualWindowStateEntry.getKey(), actualWindowStateEntry.getValue()); @@ -606,10 +602,6 @@ public class QueryableStateIntegrationTest { if (expectedCount.containsKey(actualCountStateEntry.getKey())) { final Long expectedValue = expectedCount.get(actualCountStateEntry.getKey()); assertTrue(actualCountStateEntry.getValue() >= expectedValue); - } else { - if (failIfKeyNotFound) { - fail("Key not found " + actualCountStateEntry.getKey()); - } } // return this for next round of comparisons expectedCount.put(actualCountStateEntry.getKey(), actualCountStateEntry.getValue());