Repository: kafka
Updated Branches:
  refs/heads/trunk dedacd06e -> f90321553


KAFKA-4049: Fix transient failure in RegexSourceIntegrationTest

Author: Guozhang Wang <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Ewen Cheslack-Postava 
<[email protected]>

Closes #1746 from guozhangwang/K4049-RegexSourceIntegrationTest-failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9032155
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9032155
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9032155

Branch: refs/heads/trunk
Commit: f903215536af06b7b79739882d9286abc2e50000
Parents: dedacd0
Author: Guozhang Wang <[email protected]>
Authored: Mon Aug 22 16:09:02 2016 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Mon Aug 22 16:09:02 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/test/TestUtils.java   |  1 -
 .../integration/RegexSourceIntegrationTest.java | 62 +++++++-------------
 2 files changed, 21 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f9032155/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 4baa63b..44026be 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -247,7 +247,6 @@ public class TestUtils {
     public static void waitForCondition(TestCondition testCondition, long 
maxWaitMs, String conditionDetails) throws InterruptedException {
         long startTime = System.currentTimeMillis();
 
-
         while (!testCondition.conditionMet() && ((System.currentTimeMillis() - 
startTime) < maxWaitMs)) {
             Thread.sleep(Math.min(maxWaitMs, 100L));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9032155/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 0892893..51fa06a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -52,9 +52,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.regex.Pattern;
@@ -81,9 +79,6 @@ public class RegexSourceIntegrationTest {
     private static final String FA_TOPIC = "fa";
     private static final String FOO_TOPIC = "foo";
 
-    private static final int FIRST_UPDATE = 0;
-    private static final int SECOND_UPDATE = 1;
-
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final String STRING_SERDE_CLASSNAME = 
Serdes.String().getClass().getName();
     private Properties streamsConfiguration;
@@ -121,6 +116,8 @@ public class RegexSourceIntegrationTest {
     public void testRegexMatchesTopicsAWhenCreated() throws Exception {
 
         final Serde<String> stringSerde = Serdes.String();
+        final List<String> expectedFirstAssignment = 
Arrays.asList("TEST-TOPIC-1");
+        final List<String> expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
         StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
 
@@ -146,41 +143,35 @@ public class RegexSourceIntegrationTest {
         TestCondition oneTopicAdded  = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                List<String> assignedTopics = 
testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE);
-                return assignedTopics != null && 
assignedTopics.contains("TEST-TOPIC-1") && 
!assignedTopics.contains("TEST-TOPIC-2");
+                return 
testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
             }
         };
 
         streamThreads[0] = testStreamThread;
         streams.start();
 
-        TestUtils.waitForCondition(oneTopicAdded,  STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(oneTopicAdded, STREAM_TASKS_NOT_UPDATED);
 
         CLUSTER.createTopic("TEST-TOPIC-2");
 
         TestCondition secondTopicAdded  = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                List<String> assignedTopics = 
testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE);
-                return assignedTopics != null && 
assignedTopics.contains("TEST-TOPIC-1") && 
assignedTopics.contains("TEST-TOPIC-2");
+                return 
testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
             }
         };
 
-        TestUtils.waitForCondition(secondTopicAdded,  
STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(secondTopicAdded, STREAM_TASKS_NOT_UPDATED);
 
         streams.close();
-
-        List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
-        List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", 
"TEST-TOPIC-2");
-
-        assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), 
equalTo(expectedFirstAssignment));
-        
assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), 
equalTo(expectedSecondAssignment));
     }
 
     @Test
     public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
 
         final Serde<String> stringSerde = Serdes.String();
+        final List<String> expectedFirstAssignment = 
Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
+        final List<String> expectedSecondAssignment = 
Arrays.asList("TEST-TOPIC-B");
 
         StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
 
@@ -209,34 +200,25 @@ public class RegexSourceIntegrationTest {
         TestCondition bothTopicsAdded  = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                List<String> assignedTopics = 
testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE);
-                return assignedTopics != null && 
assignedTopics.contains("TEST-TOPIC-A") && 
assignedTopics.contains("TEST-TOPIC-B");
+                return 
testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
             }
         };
         streams.start();
 
-        TestUtils.waitForCondition(bothTopicsAdded,  STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(bothTopicsAdded, STREAM_TASKS_NOT_UPDATED);
 
         CLUSTER.deleteTopic("TEST-TOPIC-A");
 
-
         TestCondition oneTopicRemoved  = new TestCondition() {
             @Override
             public boolean conditionMet() {
-                List<String> assignedTopics = 
testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE);
-                return assignedTopics != null && 
!assignedTopics.contains("TEST-TOPIC-A") && 
assignedTopics.contains("TEST-TOPIC-B");
+                return 
testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
             }
         };
 
-        TestUtils.waitForCondition(oneTopicRemoved,  STREAM_TASKS_NOT_UPDATED);
+        TestUtils.waitForCondition(oneTopicRemoved, STREAM_TASKS_NOT_UPDATED);
 
         streams.close();
-
-        List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", 
"TEST-TOPIC-B");
-        List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
-
-        assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), 
equalTo(expectedFirstAssignment));
-        
assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), 
equalTo(expectedSecondAssignment));
     }
 
 
@@ -291,7 +273,7 @@ public class RegexSourceIntegrationTest {
         assertThat(actualValues, equalTo(expectedReceivedValues));
     }
 
-    //TODO should be updated to expected = TopologyBuilderException after 
KAFKA-3708
+    // TODO should be updated to expected = TopologyBuilderException after 
KAFKA-3708
     @Test(expected = AssertionError.class)
     public void testNoMessagesSentExceptionFromOverlappingPatterns() throws 
Exception {
 
@@ -304,8 +286,8 @@ public class RegexSourceIntegrationTest {
         KStreamBuilder builder = new KStreamBuilder();
 
 
-        //  overlapping patterns here, no messages should be sent as 
TopologyBuilderException
-        //  will be thrown when the processor topology is built.
+        // overlapping patterns here, no messages should be sent as 
TopologyBuilderException
+        // will be thrown when the processor topology is built.
 
         KStream<String, String> pattern1Stream = 
builder.stream(Pattern.compile("foo.*"));
         KStream<String, String> pattern2Stream = 
builder.stream(Pattern.compile("f.*"));
@@ -334,9 +316,7 @@ public class RegexSourceIntegrationTest {
     }
 
     private class TestStreamThread extends StreamThread {
-
-        public Map<Integer, List<String>> assignedTopicPartitions = new 
HashMap<>();
-        private int index =  0;
+        public volatile List<String> assignedTopicPartitions = new 
ArrayList<>();
 
         public TestStreamThread(TopologyBuilder builder, StreamsConfig config, 
KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID 
processId, Metrics metrics, Time time) {
             super(builder, config, clientSupplier, applicationId, clientId, 
processId, metrics, time, new StreamsMetadataState(builder));
@@ -344,15 +324,15 @@ public class RegexSourceIntegrationTest {
 
         @Override
         public StreamTask createStreamTask(TaskId id, 
Collection<TopicPartition> partitions) {
-            List<String> assignedTopics = new ArrayList<>();
+            List<String> topicPartitions = new ArrayList<>();
             for (TopicPartition partition : partitions) {
-                assignedTopics.add(partition.topic());
+                topicPartitions.add(partition.topic());
             }
-            Collections.sort(assignedTopics);
-            assignedTopicPartitions.put(index++, assignedTopics);
+            Collections.sort(topicPartitions);
+
+            assignedTopicPartitions = topicPartitions;
             return super.createStreamTask(id, partitions);
         }
 
     }
-
 }

Reply via email to