Repository: kafka Updated Branches: refs/heads/trunk 756ec494d -> 7b7c4a7bb
KAFKA-3678: Removed sleep from streams integration tests Author: Eno Thereska <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1439 from enothereska/KAFKA-3678-timeouts1 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7b7c4a7b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7b7c4a7b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7b7c4a7b Branch: refs/heads/trunk Commit: 7b7c4a7bb0fd25ddca4e4bdde9e605b3d5a1ba70 Parents: 756ec49 Author: Eno Thereska <[email protected]> Authored: Fri May 27 18:40:11 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri May 27 18:40:11 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/integration/FanoutIntegrationTest.java | 5 +---- .../streams/integration/InternalTopicIntegrationTest.java | 7 ++----- .../apache/kafka/streams/integration/JoinIntegrationTest.java | 5 +---- .../kafka/streams/integration/MapFunctionIntegrationTest.java | 5 +---- .../kafka/streams/integration/PassThroughIntegrationTest.java | 5 +---- .../kafka/streams/integration/WordCountIntegrationTest.java | 7 ++----- 6 files changed, 8 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java index 5199caa..6494533 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java @@ -97,6 +97,7 @@ public class FanoutIntegrationTest { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A); KStream<byte[], String> stream2 = stream1.mapValues( @@ -119,10 +120,6 @@ public class FanoutIntegrationTest { KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all - // of the input data we produce below). - Thread.sleep(5000); - // // Step 2: Produce some input data to the input topic. // http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index e431b57..809a238 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.integration; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -123,7 +124,7 @@ public class InternalTopicIntegrationTest { streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); - + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC); @@ -149,10 +150,6 @@ public class InternalTopicIntegrationTest { KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all - // of the input data we produce below). - Thread.sleep(5000); - // // Step 2: Produce some input data to the input topic. // http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java index 4f318ec..9e9d366 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -141,6 +141,7 @@ public class JoinIntegrationTest { streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Explicitly place the state directory under /tmp so that we can remove it via // `purgeLocalStreamsState` below. Once Streams is updated to expose the effective // StreamsConfig configuration (so we can retrieve whatever state directory Streams came up @@ -217,10 +218,6 @@ public class JoinIntegrationTest { KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all - // of the input data we produce below). - Thread.sleep(10000); - // // Step 2: Publish user-region information. // http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java index 3c37aa1..2096d9b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java @@ -79,6 +79,7 @@ public class MapFunctionIntegrationTest { streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStream<byte[], String> input = builder.stream(DEFAULT_INPUT_TOPIC); KStream<byte[], String> uppercased = input.mapValues(new ValueMapper<String, String>() { @@ -92,10 +93,6 @@ public class MapFunctionIntegrationTest { KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all - // of the input data we produce below). - Thread.sleep(5000); - // // Step 2: Produce some input data to the input topic. // http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java index e81d21c..4e6dcb2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java @@ -72,6 +72,7 @@ public class PassThroughIntegrationTest { streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Write the input data as-is to the output topic. builder.stream(DEFAULT_INPUT_TOPIC).to(DEFAULT_OUTPUT_TOPIC); @@ -79,10 +80,6 @@ public class PassThroughIntegrationTest { KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all - // of the input data we produce below). - Thread.sleep(5000); - // // Step 2: Produce some input data to the input topic. // http://git-wip-us.apache.org/repos/asf/kafka/blob/7b7c4a7b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java index c86409a..e00cd13 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java @@ -83,6 +83,7 @@ public class WordCountIntegrationTest { streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Explicitly place the state directory under /tmp so that we can remove it via // `purgeLocalStreamsState` below. Once Streams is updated to expose the effective // StreamsConfig configuration (so we can retrieve whatever state directory Streams came up @@ -115,11 +116,7 @@ public class WordCountIntegrationTest { KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - - // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all - // of the input data we produce below). - Thread.sleep(5000); - + // // Step 2: Produce some input data to the input topic. //
